Skip to main content
A Stream<A, E, R> represents a description of a program that may emit zero or more values of type A, may fail with errors of type E, and uses a context of type R. Streams are pull-based, offering inherent laziness and backpressure. They form a monad on the A type parameter and provide error management facilities similar to Effect.

Type Parameters

  • A: The type of values emitted by the stream
  • E: The type of errors the stream can fail with (defaults to never)
  • R: The context type required by the stream (defaults to never)

Creating Streams

From Values

import { Stream } from "effect"

// Create a stream from multiple values
const stream = Stream.make(1, 2, 3, 4, 5)

// Empty stream
const empty = Stream.empty

// Single value stream
const single = Stream.succeed(42)

// Stream from an array
const fromArray = Stream.fromIterable([1, 2, 3])

// Stream from a range
const range = Stream.range(1, 10) // 1 to 10 inclusive

From Effects

import { Effect, Stream } from "effect"

// Create stream from a single effect
const fromEffect = Stream.fromEffect(
  Effect.succeed("hello")
)

// Repeat an effect
const repeated = Stream.repeatEffect(
  Effect.succeed(Math.random())
)

Async Streams

import { Chunk, Effect, Option, Stream } from "effect"
import type { StreamEmit } from "effect"

// Async stream with callback
const asyncStream = Stream.async(
  (emit: StreamEmit.Emit<never, never, number, void>) => {
    const interval = setInterval(() => {
      emit(Effect.succeed(Chunk.of(Date.now())))
    }, 1000)
    
    return Effect.sync(() => clearInterval(interval))
  }
)

// Async push-based stream
const pushStream = Stream.asyncPush<string>((emit) =>
  Effect.acquireRelease(
    Effect.gen(function*() {
      yield* Effect.log("subscribing")
      return setInterval(() => emit.single("tick"), 1000)
    }),
    (handle) => Effect.sync(() => clearInterval(handle))
  ),
  { bufferSize: 16, strategy: "dropping" }
)

Resource Management

import { Console, Effect, Stream } from "effect"

// Acquire-release pattern
const fileStream = Stream.acquireRelease(
  Effect.gen(function*() {
    yield* Console.log("Opening file")
    return { data: ["line1", "line2"], close: Console.log("Closing file") }
  }),
  (file) => file.close
).pipe(
  Stream.flatMap((file) => Stream.fromIterable(file.data))
)

Transforming Streams

Mapping

import { Stream } from "effect"

const numbers = Stream.range(1, 5)

// Map values
const doubled = numbers.pipe(
  Stream.map(n => n * 2)
)

// Map to constant
const nulls = numbers.pipe(
  Stream.as(null)
)

// Map with effects
const withEffects = numbers.pipe(
  Stream.mapEffect(n => 
    Effect.succeed(n * 2)
  )
)

Filtering

import { Stream } from "effect"

const numbers = Stream.range(1, 10)

// Filter values
const evens = numbers.pipe(
  Stream.filter(n => n % 2 === 0)
)

// Filter with effects
const filtered = numbers.pipe(
  Stream.filterEffect(n =>
    Effect.succeed(n > 5)
  )
)

// Take while predicate is true
const taken = numbers.pipe(
  Stream.takeWhile(n => n < 5)
)

// Drop while predicate is true
const dropped = numbers.pipe(
  Stream.dropWhile(n => n < 5)
)

Taking and Dropping

import { Stream } from "effect"

const numbers = Stream.range(1, 10)

// Take first n elements
const first3 = numbers.pipe(
  Stream.take(3)
)

// Drop first n elements
const skipFirst3 = numbers.pipe(
  Stream.drop(3)
)

// Take last n elements
const last3 = numbers.pipe(
  Stream.takeRight(3)
)

// Drop last n elements
const dropLast3 = numbers.pipe(
  Stream.dropRight(3)
)

FlatMapping

import { Stream } from "effect"

// Flatten nested streams
const nested = Stream.make(1, 2, 3).pipe(
  Stream.flatMap(n =>
    Stream.range(1, n)
  )
)

// Flatten with concurrency
const concurrent = Stream.make(1, 2, 3).pipe(
  Stream.flatMap(
    n => Stream.range(1, n),
    { concurrency: 2 }
  )
)

Combining Streams

Concatenation

import { Stream } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)

// Concatenate streams sequentially
const combined = Stream.concat(s1, s2)
// Output: 1, 2, 3, 4, 5

Merging

import { Stream } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5, 6)

// Merge streams concurrently
const merged = Stream.merge(s1, s2)

// Merge with strategy
const mergedAll = Stream.mergeAll(
  [s1, s2],
  { concurrency: 2 }
)

Zipping

import { Stream } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make("a", "b", "c")

// Zip streams (point-wise)
const zipped = Stream.zip(s1, s2)
// Output: [1, "a"], [2, "b"], [3, "c"]

// Zip with custom function
const combined = Stream.zipWith(
  s1,
  s2,
  (n, s) => `${n}${s}`
)
// Output: "1a", "2b", "3c"

Cross Product

import { Stream } from "effect"

const s1 = Stream.make(1, 2)
const s2 = Stream.make("a", "b")

// Cartesian product
const product = Stream.cross(s1, s2)
// Output: [1, "a"], [1, "b"], [2, "a"], [2, "b"]

Stream Consumption

Running Streams

import { Effect, Stream } from "effect"

const stream = Stream.range(1, 5)

// Collect all elements into a Chunk
const collected = Stream.runCollect(stream)

// Run with forEach
const processed = Stream.runForEach(
  stream,
  (n) => Console.log(`Value: ${n}`)
)

// Fold/reduce the stream
const sum = Stream.runFold(
  stream,
  0,
  (acc, n) => acc + n
)

// Drain (ignore all elements)
const drained = stream.pipe(
  Stream.runDrain
)

Using Sinks

import { Sink, Stream } from "effect"

const stream = Stream.range(1, 10)

// Run stream into a sink
const result = stream.pipe(
  Stream.run(Sink.sum)
)

// Collect first 5 elements
const first5 = stream.pipe(
  Stream.run(Sink.collectAllN(5))
)

Buffering and Timing

Buffering

import { Stream } from "effect"

const stream = Stream.range(1, 100)

// Buffer elements
const buffered = stream.pipe(
  Stream.buffer({ capacity: 10 })
)

// Buffer with strategy
const bufferedDropping = stream.pipe(
  Stream.buffer({
    capacity: 10,
    strategy: "dropping" // or "sliding" or "suspend"
  })
)

Debouncing and Throttling

import { Stream } from "effect"

const events = Stream.make(1, 2, 3, 4, 5)

// Debounce (emit after quiet period)
const debounced = events.pipe(
  Stream.debounce("100 millis")
)

// Throttle (limit emission rate)
const throttled = events.pipe(
  Stream.throttle({
    cost: 1,
    duration: "1 second",
    units: 10
  })
)

Scheduling

import { Schedule, Stream } from "effect"

const stream = Stream.range(1, 5)

// Add delays between elements
const scheduled = stream.pipe(
  Stream.schedule(Schedule.spaced("1 second"))
)

// Repeat stream on a schedule
const repeated = stream.pipe(
  Stream.repeat(Schedule.recurs(3))
)

Broadcasting

import { Effect, Fiber, Stream } from "effect"

// Broadcast to multiple consumers
const program = Effect.scoped(
  Stream.range(1, 10).pipe(
    Stream.broadcast(2, 5), // 2 streams, max lag of 5
    Stream.flatMap(([stream1, stream2]) =>
      Effect.gen(function*() {
        const f1 = yield* Stream.runForEach(
          stream1,
          (n) => Console.log(`Consumer 1: ${n}`)
        ).pipe(Effect.fork)
        
        const f2 = yield* Stream.runForEach(
          stream2,
          (n) => Console.log(`Consumer 2: ${n}`)
        ).pipe(Effect.fork)
        
        yield* Fiber.join(f1)
        yield* Fiber.join(f2)
      })
    ),
    Stream.runDrain
  )
)

Error Handling

import { Effect, Stream } from "effect"

// Catch and recover from errors
const recovered = Stream.range(1, 10).pipe(
  Stream.mapEffect(n =>
    n === 5 ? Effect.fail("Error at 5") : Effect.succeed(n)
  ),
  Stream.catchAll(error =>
    Stream.succeed(-1) // fallback stream
  )
)

// Catch specific errors
const specific = stream.pipe(
  Stream.catchTag("NetworkError", (error) =>
    Stream.succeed("default value")
  )
)

// Retry on failure
const retried = stream.pipe(
  Stream.retry(Schedule.recurs(3))
)

// OrElse - fallback to another stream
const withFallback = stream.pipe(
  Stream.orElse(() => Stream.succeed(0))
)

Grouping and Windowing

import { Stream } from "effect"

const stream = Stream.range(1, 10)

// Group into chunks
const chunked = stream.pipe(
  Stream.grouped(3) // chunks of 3
)

// Group within time window
const windowed = stream.pipe(
  Stream.groupedWithin(5, "1 second")
)

// Group by key
const grouped = Stream.make(
  { type: "A", value: 1 },
  { type: "B", value: 2 },
  { type: "A", value: 3 }
).pipe(
  Stream.groupByKey(item => item.type)
)

Common Patterns

Streaming Pipeline

import { Effect, Schedule, Sink, Stream } from "effect"

const pipeline = Stream.range(1, 100).pipe(
  Stream.filter(n => n % 2 === 0),
  Stream.map(n => n * 2),
  Stream.mapEffect(n =>
    Effect.sleep("10 millis").pipe(
      Effect.as(n)
    )
  ),
  Stream.buffer({ capacity: 10 }),
  Stream.schedule(Schedule.spaced("100 millis")),
  Stream.take(10),
  Stream.run(Sink.collectAll())
)

Infinite Streams

import { Stream } from "effect"

// Infinite range
const infinite = Stream.iterate(0, n => n + 1)

// Infinite repeating value
const repeated = Stream.repeatValue(42)

// Ticks
const ticks = Stream.tick("1 second")

Pagination

import { Effect, Stream } from "effect"

interface Page {
  items: number[]
  nextCursor: string | null
}

const paginated = Stream.paginateEffect(
  "initial-cursor",
  (cursor) =>
    Effect.gen(function*() {
      const page = yield* fetchPage(cursor)
      return [
        page.items,
        page.nextCursor === null
          ? Option.none()
          : Option.some(page.nextCursor)
      ]
    })
)

Interruption

import { Effect, Stream } from "effect"

const stream = Stream.range(1, 1000)

// Interrupt after duration
const timed = stream.pipe(
  Stream.interruptAfter("5 seconds")
)

// Interrupt when effect completes
const conditional = stream.pipe(
  Stream.interruptWhen(
    Effect.sleep("5 seconds")
  )
)

Performance Tips

  1. Chunking: Streams emit chunks of values for efficiency. Use rechunk to optimize chunk size.
  2. Buffering: Use buffer or bufferChunks to decouple fast producers from slow consumers.
  3. Concurrency: Use flatMap with concurrency option for parallel processing.
  4. Resource Management: Always use acquireRelease or ensuring for proper cleanup.

See Also

Build docs developers (and LLMs) love