Skip to main content
The Stream module provides a powerful abstraction for working with sequences of values that are produced over time. Streams are pull-based with backpressure, making them ideal for processing large datasets, event streams, and continuous data sources.

Overview

A Stream<A, E, R> describes a program that can:
  • Emit many values of type A
  • Fail with an error of type E
  • Require services of type R
Streams are pull-based with backpressure, emit chunks to amortize effect evaluation, and support monadic composition similar to Effect, but adapted for multiple values.

Creating Streams

From Iterables

import { Stream } from "effect"

// From an array
const numbers = Stream.fromIterable([1, 2, 3, 4, 5])

// From a range
const range = Stream.range(1, 10)

// Create a stream with specific values
const stream = Stream.make(1, 2, 3)

From Effects

import { Effect, Stream } from "effect"

// Create a stream from a single Effect
const fromEffect = Stream.fromEffect(
  Effect.succeed(42)
)

// Repeat an Effect with a schedule
const polled = Stream.fromEffectSchedule(
  Effect.succeed(new Date()),
  Schedule.spaced("1 second")
)

From Async Sources

import { Stream } from "effect"

// From an async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
  (async function* () {
    yield 1
    yield 2
    yield 3
  })()
)

// From a callback-based API
const fromCallback = Stream.callback<string>((emit) => {
  const interval = setInterval(() => {
    emit.single("tick")
  }, 1000)
  
  return () => clearInterval(interval)
})

Paginated Data

import { Effect, Stream } from "effect"

// Create a stream from paginated API
const paginatedStream = Stream.paginate(1, (page) =>
  Effect.gen(function*() {
    const response = yield* fetchPage(page)
    
    if (response.items.length === 0) {
      return [response.items, undefined] // No more pages
    }
    
    return [response.items, page + 1] // Next page
  })
)

Transforming Streams

Basic Transformations

import { Stream } from "effect"

const stream = Stream.range(1, 10).pipe(
  // Transform each element
  Stream.map((n) => n * 2),
  
  // Filter elements
  Stream.filter((n) => n > 5),
  
  // Take first N elements
  Stream.take(5),
  
  // Drop first N elements
  Stream.drop(2)
)

Effectful Transformations

import { Effect, Stream } from "effect"

const stream = Stream.fromIterable([1, 2, 3]).pipe(
  // Map with an Effect
  Stream.mapEffect((n) =>
    Effect.gen(function*() {
      yield* Effect.sleep("100 millis")
      return n * 2
    })
  ),
  
  // Process with concurrency
  Stream.mapEffectPar((n) => fetchData(n), { concurrency: 5 })
)

FlatMap and Chaining

import { Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.flatMap((n) =>
    Stream.range(1, n) // Creates n streams
  )
)
// Emits: 1, 1, 2, 1, 2, 3

Consuming Streams

Running to Collection

import { Effect, Stream } from "effect"

const program = Effect.gen(function*() {
  const stream = Stream.make(1, 2, 3, 4, 5)
  
  // Collect all elements into an array
  const array = yield* Stream.runCollect(stream)
  
  console.log(array) // [1, 2, 3, 4, 5]
})

Running with Effects

import { Console, Effect, Stream } from "effect"

const program = Stream.make(1, 2, 3).pipe(
  // Process each element with an Effect
  Stream.runForEach((n) => Console.log(`Processing: ${n}`))
)

Folding and Reducing

import { Effect, Stream } from "effect"

const sum = Stream.range(1, 10).pipe(
  Stream.runFold(0, (acc, n) => acc + n)
)
// Returns Effect<number> with sum of 1..10

const concatenated = Stream.make("a", "b", "c").pipe(
  Stream.runFoldEffect("", (acc, s) =>
    Effect.succeed(acc + s)
  )
)

Error Handling

import { Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.mapEffect((n) =>
    n === 2
      ? Effect.fail(new Error("Failed at 2"))
      : Effect.succeed(n)
  ),
  
  // Catch and recover from errors
  Stream.catchAll((error) =>
    Stream.make(-1)
  )
)

Merging and Combining

Merge Streams

import { Stream } from "effect"

const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make(4, 5, 6)

// Merge streams concurrently
const merged = Stream.merge(stream1, stream2)

// Merge with specific strategy
const mergedEither = Stream.mergeEither(stream1, stream2)

Zip Streams

import { Stream } from "effect"

const left = Stream.make(1, 2, 3)
const right = Stream.make("a", "b", "c")

// Zip element-wise
const zipped = Stream.zip(left, right)
// Emits: [1, "a"], [2, "b"], [3, "c"]

Concat Streams

import { Stream } from "effect"

const first = Stream.make(1, 2, 3)
const second = Stream.make(4, 5, 6)

// Concatenate streams sequentially
const concatenated = Stream.concat(first, second)
// Emits: 1, 2, 3, 4, 5, 6

Grouping and Chunking

import { Stream } from "effect"

const stream = Stream.range(1, 100).pipe(
  // Group into chunks of 10
  Stream.grouped(10),
  
  // Group within time windows
  Stream.groupedWithin(10, "1 second")
)

Scheduling and Timing

import { Schedule, Stream } from "effect"

// Throttle stream emissions
const throttled = Stream.range(1, 100).pipe(
  Stream.schedule(Schedule.spaced("100 millis"))
)

// Debounce stream
const debounced = stream.pipe(
  Stream.debounce("500 millis")
)

// Add delays between elements
const delayed = Stream.make(1, 2, 3).pipe(
  Stream.schedule(Schedule.exponential("100 millis"))
)

Integration with PubSub

import { Effect, PubSub, Stream } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<number>(10)
  
  // Create a stream from PubSub
  const stream = Stream.fromPubSub(pubsub)
  
  // Publish to PubSub
  yield* PubSub.publish(pubsub, 1)
  yield* PubSub.publish(pubsub, 2)
  
  // Consume stream
  yield* Stream.take(stream, 2).pipe(
    Stream.runForEach((n) => Console.log(n))
  )
})

Best Practices

  1. Use Stream.runForEach for side effects: Process each element with effects
  2. Leverage chunking: Use grouped or groupedWithin for batch processing
  3. Control concurrency: Use mapEffectPar with appropriate concurrency limits
  4. Handle backpressure: Streams naturally handle backpressure with pull-based semantics
  5. Compose streams: Build complex streams from simpler ones

Performance Tips

  • Use Stream.buffer to improve throughput
  • Process in chunks when possible to amortize effect costs
  • Use mapEffectPar for concurrent processing of independent elements
  • Consider Stream.throttle to control emission rate

Next Steps

  • Learn about Effect for handling individual values
  • Explore PubSub for broadcasting messages
  • Understand Queue for concurrent processing

Build docs developers (and LLMs) love