Skip to main content

Stream

A Stream<A, E, R> describes a program that can emit many A values, fail with E, and require R. Streams are pull-based with backpressure and emit chunks to amortize effect evaluation. They support monadic composition and error handling similar to Effect, adapted for multiple values.

Type Signature

interface Stream<out A, out E = never, out R = never>
A
type parameter
The type of values emitted by the stream
E
type parameter
default:"never"
The error type the stream may fail with
R
type parameter
default:"never"
The context/services required to run the stream

Key Features

  • Pull-based: Consumers control the rate of emission (backpressure)
  • Chunked: Values are emitted in chunks for efficiency
  • Composable: Rich set of operators for transformation
  • Resource-safe: Automatic cleanup with scoped resources
  • Concurrent: Built-in support for parallel processing
  • Interruptible: Streams can be safely interrupted

Creating Streams

make

Creates a stream from a sequence of values.
const stream = Stream.make(1, 2, 3)
// Stream<number>
Signature:
function make<const As extends ReadonlyArray<any>>(
  ...values: As
): Stream<As[number]>
values
...As
required
The values to emit from the stream
returns
Stream<As[number]>
A stream that emits all provided values

succeed

Creates a single-valued pure stream.
const stream = Stream.succeed(42)
// Stream<number> that emits 42
Signature:
function succeed<A>(value: A): Stream<A>
value
A
required
The single value to emit
returns
Stream<A>
A stream that emits the single value

empty

Creates an empty stream that emits no values.
const stream = Stream.empty
// Stream<never>
Signature:
const empty: Stream<never>

sync

Creates a stream that synchronously evaluates a function and emits the result.
const stream = Stream.sync(() => Date.now())
// Stream<number> - emits current timestamp
Signature:
function sync<A>(evaluate: () => A): Stream<A>
evaluate
() => A
required
Function evaluated each time the stream runs
returns
Stream<A>
A stream that emits the function result

suspend

Creates a lazily constructed stream.
const stream = Stream.suspend(() => Stream.make(1, 2, 3))
// Stream is reconstructed on each run
Signature:
function suspend<A, E, R>(
  stream: () => Stream<A, E, R>
): Stream<A, E, R>
stream
() => Stream<A, E, R>
required
Factory function that creates the stream
returns
Stream<A, E, R>
A lazily evaluated stream

callback

Creates a stream from a callback that can emit values into a queue.
import { Effect, Queue, Stream } from "effect"

const stream = Stream.callback<number>((queue) =>
  Effect.sync(() => {
    Queue.offerUnsafe(queue, 1)
    Queue.offerUnsafe(queue, 2)
    Queue.offerUnsafe(queue, 3)
    Queue.endUnsafe(queue)
  })
)
Signature:
function callback<A, E = never, R = never>(
  f: (queue: Queue.Queue<A, E | Cause.Done>) => Effect.Effect<unknown, E, R | Scope>,
  options?: {
    bufferSize?: number
    strategy?: "sliding" | "dropping" | "suspend"
  }
): Stream<A, E, Exclude<R, Scope>>
f
callback function
required
Function that emits values to a queue
options.bufferSize
number
Size of the buffer (default: unbounded)
options.strategy
string
Backpressure strategy: "sliding", "dropping", or "suspend"
returns
Stream<A, E, R>
A stream that emits the queued values

Transforming Streams

map

Transforms each emitted value.
const doubled = Stream.make(1, 2, 3).pipe(
  Stream.map((n) => n * 2)
)
// Stream<number> emitting 2, 4, 6
Signature:
function map<A, B>(
  f: (a: A) => B
): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>

filter

Filters stream values based on a predicate.
const evens = Stream.make(1, 2, 3, 4).pipe(
  Stream.filter((n) => n % 2 === 0)
)
// Stream<number> emitting 2, 4
Signature:
function filter<A, B extends A>(
  predicate: Refinement<A, B>
): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>

function filter<A>(
  predicate: Predicate<A>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>

flatMap

Transforms each value into a stream and flattens the result.
const stream = Stream.make(1, 2, 3).pipe(
  Stream.flatMap((n) => Stream.make(n, n * 10))
)
// Stream<number> emitting 1, 10, 2, 20, 3, 30
Signature:
function flatMap<A, B, E2, R2>(
  f: (a: A) => Stream<B, E2, R2>,
  options?: { concurrency?: Concurrency }
): <E, R>(self: Stream<A, E, R>) => Stream<B, E | E2, R | R2>
f
(a: A) => Stream<B, E2, R2>
required
Function that maps each value to a stream
options.concurrency
Concurrency
Controls concurrency: "sequential", "unbounded", or a number

take

Takes the first n elements from the stream.
const first3 = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.take(3)
)
// Stream<number> emitting 1, 2, 3
Signature:
function take(
  n: number
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>

drop

Drops the first n elements from the stream.
const afterFirst2 = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.drop(2)
)
// Stream<number> emitting 3, 4, 5
Signature:
function drop(
  n: number
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>

Running Streams

runCollect

Collects all values from a stream into an array.
const program = Effect.gen(function*() {
  const values = yield* Stream.make(1, 2, 3).pipe(
    Stream.runCollect
  )
  console.log(values) // [1, 2, 3]
})
Signature:
function runCollect<A, E, R>(
  self: Stream<A, E, R>
): Effect.Effect<Array<A>, E, R>
returns
Effect<Array<A>, E, R>
An effect that collects all stream values into an array

runForEach

Runs an effect for each value in the stream.
const program = Stream.make(1, 2, 3).pipe(
  Stream.runForEach((n) => Console.log(`Value: ${n}`))
)
// Logs: Value: 1, Value: 2, Value: 3
Signature:
function runForEach<A, E2, R2>(
  f: (a: A) => Effect.Effect<any, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E | E2, R | R2>

runFold

Folds over the stream to produce a single accumulated value.
const sum = Stream.make(1, 2, 3, 4).pipe(
  Stream.runFold(0, (acc, n) => acc + n)
)
// Effect<number> with result 10
Signature:
function runFold<A, S>(
  initial: S,
  f: (s: S, a: A) => S
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
initial
S
required
Initial accumulator value
f
(s: S, a: A) => S
required
Folding function that combines accumulator with each value

Combining Streams

concat

Concatenates two streams.
const combined = Stream.make(1, 2).pipe(
  Stream.concat(Stream.make(3, 4))
)
// Stream<number> emitting 1, 2, 3, 4
Signature:
function concat<A2, E2, R2>(
  that: Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E | E2, R | R2>

merge

Merges two streams, emitting values from both concurrently.
const merged = Stream.make(1, 2).pipe(
  Stream.merge(Stream.make(3, 4))
)
// Stream<number> emitting values from both streams concurrently
Signature:
function merge<A2, E2, R2>(
  that: Stream<A2, E2, R2>,
  options?: { haltStrategy?: HaltStrategy }
): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E | E2, R | R2>

zip

Zips two streams together element-wise.
const zipped = Stream.make(1, 2, 3).pipe(
  Stream.zip(Stream.make("a", "b", "c"))
)
// Stream<[number, string]> emitting [1, "a"], [2, "b"], [3, "c"]
Signature:
function zip<A2, E2, R2>(
  that: Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<[A, A2], E | E2, R | R2>

Error Handling

catchAll

Handles all errors in a stream by providing a recovery stream.
const recovered = Stream.fail("error").pipe(
  Stream.catchAll((error) => Stream.succeed(`Recovered: ${error}`))
)
Signature:
function catchAll<E, A2, E2, R2>(
  f: (e: E) => Stream<A2, E2, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A | A2, E2, R | R2>

orElse

Tries a stream, and if it fails, tries another stream.
const fallback = Stream.fail("error").pipe(
  Stream.orElse(() => Stream.succeed("fallback"))
)
Signature:
function orElse<A2, E2, R2>(
  that: () => Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E2, R | R2>

Constants

DefaultChunkSize

The default chunk size used by Stream constructors and combinators.
console.log(Stream.DefaultChunkSize) // 4096

Type Utilities

Success

Extracts the value type from a Stream.
type MyStream = Stream<number, string, never>
type ValueType = Stream.Success<MyStream> // number

Error

Extracts the error type from a Stream.
type MyStream = Stream<number, string, never>
type ErrorType = Stream.Error<MyStream> // string

Services

Extracts the services type from a Stream.
type MyStream = Stream<number, string, Database>
type RequiredServices = Stream.Services<MyStream> // Database

Complete Example

import { Console, Effect, Stream } from "effect"

// Create a stream from multiple sources
const numbers = Stream.make(1, 2, 3, 4, 5)

// Transform and process the stream
const program = Effect.gen(function*() {
  // Transform values
  const transformed = numbers.pipe(
    Stream.filter((n) => n % 2 === 0),
    Stream.map((n) => n * 10),
    Stream.take(2)
  )
  
  // Collect results
  const results = yield* Stream.runCollect(transformed)
  yield* Console.log(`Results: ${results}`)
  // Output: Results: [20, 40]
  
  // Process with effects
  yield* numbers.pipe(
    Stream.runForEach((n) => Console.log(`Processing: ${n}`))
  )
  
  // Fold to aggregate
  const sum = yield* numbers.pipe(
    Stream.runFold(0, (acc, n) => acc + n)
  )
  yield* Console.log(`Sum: ${sum}`)
  // Output: Sum: 15
})

Effect.runPromise(program)

Build docs developers (and LLMs) love