Skip to main content
A Stream is a description of a program that emits zero or more values of type A, may fail with errors of type E, and uses a context of type R.

Type Signature

interface Stream<out A, out E = never, out R = never>
Stream is a pull-based stream offering inherent laziness and backpressure, relieving you of manual buffer management.

Creating Streams

import { Stream } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)
// Stream<number, never, never>

Transformations

map

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 5).pipe(
  Stream.map(n => n * 2)
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }

filter

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 10).pipe(
  Stream.filter(n => n % 2 === 0)
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }

flatMap

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.flatMap(n => Stream.make(n, n * 2))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 2, 4, 3, 6 ] }

Consumption

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3)
const result = Stream.runCollect(stream)

Effect.runPromise(result).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

Resource Management

acquireRelease

import { Stream, Effect, Console } from "effect"

// Simulating file operations
const open = (filename: string) =>
  Effect.gen(function*() {
    yield* Console.log(`Opening ${filename}`)
    return {
      getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
      close: Console.log(`Closing ${filename}`)
    }
  })

const stream = Stream.acquireRelease(
  open("file.txt"),
  (file) => file.close
).pipe(Stream.flatMap((file) => file.getLines))

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Opening file.txt
// Closing file.txt
// { _id: 'Chunk', values: [ [ 'Line 1', 'Line 2', 'Line 3' ] ] }

Combining Streams

merge

import { Stream, Effect } from "effect"

const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make(4, 5, 6)

const merged = Stream.merge(stream1, stream2)

Effect.runPromise(Stream.runCollect(merged)).then(console.log)
// Values interleaved from both streams

concat

import { Stream, Effect } from "effect"

const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make(4, 5, 6)

const concatenated = Stream.concat(stream1, stream2)

Effect.runPromise(Stream.runCollect(concatenated)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }

zip

import { Stream, Effect } from "effect"

const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make("a", "b", "c")

const zipped = Stream.zip(stream1, stream2)

Effect.runPromise(Stream.runCollect(zipped)).then(console.log)
// { _id: 'Chunk', values: [ [1, 'a'], [2, 'b'], [3, 'c'] ] }

Buffering & Windowing

buffer

import { Stream } from "effect"

const stream = Stream.range(1, 100).pipe(
  Stream.buffer({ capacity: 10 })
)

groupedWithin

import { Stream, Effect, Schedule } from "effect"

const stream = Stream.range(1, 10).pipe(
  Stream.groupedWithin(3, "1 second")
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Groups of up to 3 elements or 1 second intervals

Error Handling

import { Stream, Effect } from "effect"

const stream = Stream.fail("error").pipe(
  Stream.catchAll(error => Stream.make(`Recovered: ${error}`))
)
Streams are lazy and won’t execute until consumed. Use Stream.runCollect, Stream.runForEach, or other run functions to execute the stream.

Use Cases

  • Processing large datasets that don’t fit in memory
  • Real-time event processing
  • File I/O with backpressure
  • Network streaming
  • ETL pipelines

Key Operations

OperationDescription
makeCreates a stream from values
fromIterableCreates a stream from an iterable
rangeCreates a stream of numbers
asyncCreates a stream from callbacks
mapTransforms stream elements
filterFilters stream elements
flatMapMaps and flattens streams
mergeCombines streams concurrently
concatAppends streams sequentially
runCollectCollects all values into a Chunk

Build docs developers (and LLMs) love