A Stream is a description of a program that emits zero or more values of type A, may fail with errors of type E, and uses a context of type R.
Type Signature
interface Stream<out A, out E = never, out R = never>
Stream is a pull-based stream offering inherent laziness and backpressure, relieving you of manual buffer management.
Creating Streams
make
range
fromIterable
async
import { Stream } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// Stream<number, never, never>
import { Stream } from "effect"
const stream = Stream.range(1, 5)
// Stream<number, never, never>
import { Stream } from "effect"
const stream = Stream.fromIterable([1, 2, 3])
// Stream<number, never, never>
import { Stream, Effect, Chunk, Option } from "effect"
const stream = Stream.async<number>((emit) => {
setTimeout(() => emit(Effect.succeed(Chunk.of(1))), 100)
setTimeout(() => emit(Effect.succeed(Chunk.of(2))), 200)
setTimeout(() => emit(Effect.fail(Option.none())), 300)
})
map
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 5).pipe(
Stream.map(n => n * 2)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
filter
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 10).pipe(
Stream.filter(n => n % 2 === 0)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
flatMap
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap(n => Stream.make(n, n * 2))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 2, 4, 3, 6 ] }
Consumption
runCollect
runForEach
runFold
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3)
const result = Stream.runCollect(stream)
Effect.runPromise(result).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
import { Stream, Effect, Console } from "effect"
const stream = Stream.make(1, 2, 3)
const program = Stream.runForEach(stream, n => Console.log(n))
Effect.runPromise(program)
// 1
// 2
// 3
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
const sum = Stream.runFold(stream, 0, (acc, n) => acc + n)
Effect.runPromise(sum).then(console.log)
// 15
Resource Management
acquireRelease
import { Stream, Effect, Console } from "effect"
// Simulating file operations
const open = (filename: string) =>
Effect.gen(function*() {
yield* Console.log(`Opening ${filename}`)
return {
getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
close: Console.log(`Closing ${filename}`)
}
})
const stream = Stream.acquireRelease(
open("file.txt"),
(file) => file.close
).pipe(Stream.flatMap((file) => file.getLines))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Opening file.txt
// Closing file.txt
// { _id: 'Chunk', values: [ [ 'Line 1', 'Line 2', 'Line 3' ] ] }
Combining Streams
merge
import { Stream, Effect } from "effect"
const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make(4, 5, 6)
const merged = Stream.merge(stream1, stream2)
Effect.runPromise(Stream.runCollect(merged)).then(console.log)
// Values interleaved from both streams
concat
import { Stream, Effect } from "effect"
const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make(4, 5, 6)
const concatenated = Stream.concat(stream1, stream2)
Effect.runPromise(Stream.runCollect(concatenated)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }
zip
import { Stream, Effect } from "effect"
const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make("a", "b", "c")
const zipped = Stream.zip(stream1, stream2)
Effect.runPromise(Stream.runCollect(zipped)).then(console.log)
// { _id: 'Chunk', values: [ [1, 'a'], [2, 'b'], [3, 'c'] ] }
Buffering & Windowing
buffer
import { Stream } from "effect"
const stream = Stream.range(1, 100).pipe(
Stream.buffer({ capacity: 10 })
)
groupedWithin
import { Stream, Effect, Schedule } from "effect"
const stream = Stream.range(1, 10).pipe(
Stream.groupedWithin(3, "1 second")
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Groups of up to 3 elements or 1 second intervals
Error Handling
import { Stream, Effect } from "effect"
const stream = Stream.fail("error").pipe(
Stream.catchAll(error => Stream.make(`Recovered: ${error}`))
)
import { Stream, Schedule } from "effect"
const stream = Stream.fail("error").pipe(
Stream.retry(Schedule.recurs(3))
)
import { Stream } from "effect"
const stream = Stream.fail("error").pipe(
Stream.orElse(() => Stream.make(1, 2, 3))
)
Streams are lazy and won’t execute until consumed. Use Stream.runCollect, Stream.runForEach, or other run functions to execute the stream.
Use Cases
- Processing large datasets that don’t fit in memory
- Real-time event processing
- File I/O with backpressure
- Network streaming
- ETL pipelines
Key Operations
| Operation | Description |
|---|
make | Creates a stream from values |
fromIterable | Creates a stream from an iterable |
range | Creates a stream of numbers |
async | Creates a stream from callbacks |
map | Transforms stream elements |
filter | Filters stream elements |
flatMap | Maps and flattens streams |
merge | Combines streams concurrently |
concat | Appends streams sequentially |
runCollect | Collects all values into a Chunk |