Stream
A Stream<A, E, R> describes a program that can emit many A values, fail with E, and require R.
Streams are pull-based with backpressure and emit chunks to amortize effect evaluation. They support monadic composition and error handling similar to Effect, adapted for multiple values.
Type Signature
interface Stream<out A, out E = never, out R = never>
The type of values emitted by the stream
E
type parameter
default:"never"
The error type the stream may fail with
R
type parameter
default:"never"
The context/services required to run the stream
Key Features
- Pull-based: Consumers control the rate of emission (backpressure)
- Chunked: Values are emitted in chunks for efficiency
- Composable: Rich set of operators for transformation
- Resource-safe: Automatic cleanup with scoped resources
- Concurrent: Built-in support for parallel processing
- Interruptible: Streams can be safely interrupted
Creating Streams
make
Creates a stream from a sequence of values.
const stream = Stream.make(1, 2, 3)
// Stream<number>
Signature:
function make<const As extends ReadonlyArray<any>>(
...values: As
): Stream<As[number]>
The values to emit from the stream
A stream that emits all provided values
succeed
Creates a single-valued pure stream.
const stream = Stream.succeed(42)
// Stream<number> that emits 42
Signature:
function succeed<A>(value: A): Stream<A>
A stream that emits the single value
empty
Creates an empty stream that emits no values.
const stream = Stream.empty
// Stream<never>
Signature:
const empty: Stream<never>
sync
Creates a stream that synchronously evaluates a function and emits the result.
const stream = Stream.sync(() => Date.now())
// Stream<number> - emits current timestamp
Signature:
function sync<A>(evaluate: () => A): Stream<A>
Function evaluated each time the stream runs
A stream that emits the function result
suspend
Creates a lazily constructed stream.
const stream = Stream.suspend(() => Stream.make(1, 2, 3))
// Stream is reconstructed on each run
Signature:
function suspend<A, E, R>(
stream: () => Stream<A, E, R>
): Stream<A, E, R>
stream
() => Stream<A, E, R>
required
Factory function that creates the stream
A lazily evaluated stream
callback
Creates a stream from a callback that can emit values into a queue.
import { Effect, Queue, Stream } from "effect"
const stream = Stream.callback<number>((queue) =>
Effect.sync(() => {
Queue.offerUnsafe(queue, 1)
Queue.offerUnsafe(queue, 2)
Queue.offerUnsafe(queue, 3)
Queue.endUnsafe(queue)
})
)
Signature:
function callback<A, E = never, R = never>(
f: (queue: Queue.Queue<A, E | Cause.Done>) => Effect.Effect<unknown, E, R | Scope>,
options?: {
bufferSize?: number
strategy?: "sliding" | "dropping" | "suspend"
}
): Stream<A, E, Exclude<R, Scope>>
f
callback function
required
Function that emits values to a queue
Size of the buffer (default: unbounded)
Backpressure strategy: "sliding", "dropping", or "suspend"
A stream that emits the queued values
map
Transforms each emitted value.
const doubled = Stream.make(1, 2, 3).pipe(
Stream.map((n) => n * 2)
)
// Stream<number> emitting 2, 4, 6
Signature:
function map<A, B>(
f: (a: A) => B
): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
filter
Filters stream values based on a predicate.
const evens = Stream.make(1, 2, 3, 4).pipe(
Stream.filter((n) => n % 2 === 0)
)
// Stream<number> emitting 2, 4
Signature:
function filter<A, B extends A>(
predicate: Refinement<A, B>
): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
function filter<A>(
predicate: Predicate<A>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
flatMap
Transforms each value into a stream and flattens the result.
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap((n) => Stream.make(n, n * 10))
)
// Stream<number> emitting 1, 10, 2, 20, 3, 30
Signature:
function flatMap<A, B, E2, R2>(
f: (a: A) => Stream<B, E2, R2>,
options?: { concurrency?: Concurrency }
): <E, R>(self: Stream<A, E, R>) => Stream<B, E | E2, R | R2>
f
(a: A) => Stream<B, E2, R2>
required
Function that maps each value to a stream
Controls concurrency: "sequential", "unbounded", or a number
take
Takes the first n elements from the stream.
const first3 = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.take(3)
)
// Stream<number> emitting 1, 2, 3
Signature:
function take(
n: number
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
drop
Drops the first n elements from the stream.
const afterFirst2 = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.drop(2)
)
// Stream<number> emitting 3, 4, 5
Signature:
function drop(
n: number
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
Running Streams
runCollect
Collects all values from a stream into an array.
const program = Effect.gen(function*() {
const values = yield* Stream.make(1, 2, 3).pipe(
Stream.runCollect
)
console.log(values) // [1, 2, 3]
})
Signature:
function runCollect<A, E, R>(
self: Stream<A, E, R>
): Effect.Effect<Array<A>, E, R>
An effect that collects all stream values into an array
runForEach
Runs an effect for each value in the stream.
const program = Stream.make(1, 2, 3).pipe(
Stream.runForEach((n) => Console.log(`Value: ${n}`))
)
// Logs: Value: 1, Value: 2, Value: 3
Signature:
function runForEach<A, E2, R2>(
f: (a: A) => Effect.Effect<any, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E | E2, R | R2>
runFold
Folds over the stream to produce a single accumulated value.
const sum = Stream.make(1, 2, 3, 4).pipe(
Stream.runFold(0, (acc, n) => acc + n)
)
// Effect<number> with result 10
Signature:
function runFold<A, S>(
initial: S,
f: (s: S, a: A) => S
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
Initial accumulator value
f
(s: S, a: A) => S
required
Folding function that combines accumulator with each value
Combining Streams
concat
Concatenates two streams.
const combined = Stream.make(1, 2).pipe(
Stream.concat(Stream.make(3, 4))
)
// Stream<number> emitting 1, 2, 3, 4
Signature:
function concat<A2, E2, R2>(
that: Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E | E2, R | R2>
merge
Merges two streams, emitting values from both concurrently.
const merged = Stream.make(1, 2).pipe(
Stream.merge(Stream.make(3, 4))
)
// Stream<number> emitting values from both streams concurrently
Signature:
function merge<A2, E2, R2>(
that: Stream<A2, E2, R2>,
options?: { haltStrategy?: HaltStrategy }
): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E | E2, R | R2>
zip
Zips two streams together element-wise.
const zipped = Stream.make(1, 2, 3).pipe(
Stream.zip(Stream.make("a", "b", "c"))
)
// Stream<[number, string]> emitting [1, "a"], [2, "b"], [3, "c"]
Signature:
function zip<A2, E2, R2>(
that: Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<[A, A2], E | E2, R | R2>
Error Handling
catchAll
Handles all errors in a stream by providing a recovery stream.
const recovered = Stream.fail("error").pipe(
Stream.catchAll((error) => Stream.succeed(`Recovered: ${error}`))
)
Signature:
function catchAll<E, A2, E2, R2>(
f: (e: E) => Stream<A2, E2, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A | A2, E2, R | R2>
orElse
Tries a stream, and if it fails, tries another stream.
const fallback = Stream.fail("error").pipe(
Stream.orElse(() => Stream.succeed("fallback"))
)
Signature:
function orElse<A2, E2, R2>(
that: () => Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E2, R | R2>
Constants
DefaultChunkSize
The default chunk size used by Stream constructors and combinators.
console.log(Stream.DefaultChunkSize) // 4096
Type Utilities
Success
Extracts the value type from a Stream.
type MyStream = Stream<number, string, never>
type ValueType = Stream.Success<MyStream> // number
Error
Extracts the error type from a Stream.
type MyStream = Stream<number, string, never>
type ErrorType = Stream.Error<MyStream> // string
Services
Extracts the services type from a Stream.
type MyStream = Stream<number, string, Database>
type RequiredServices = Stream.Services<MyStream> // Database
Complete Example
import { Console, Effect, Stream } from "effect"
// Create a stream from multiple sources
const numbers = Stream.make(1, 2, 3, 4, 5)
// Transform and process the stream
const program = Effect.gen(function*() {
// Transform values
const transformed = numbers.pipe(
Stream.filter((n) => n % 2 === 0),
Stream.map((n) => n * 10),
Stream.take(2)
)
// Collect results
const results = yield* Stream.runCollect(transformed)
yield* Console.log(`Results: ${results}`)
// Output: Results: [20, 40]
// Process with effects
yield* numbers.pipe(
Stream.runForEach((n) => Console.log(`Processing: ${n}`))
)
// Fold to aggregate
const sum = yield* numbers.pipe(
Stream.runFold(0, (acc, n) => acc + n)
)
yield* Console.log(`Sum: ${sum}`)
// Output: Sum: 15
})
Effect.runPromise(program)