Skip to main content
Effect Streams represent effectful, pull-based sequences of values over time. They let you model finite or infinite data sources, transform them with composable operators, and consume them efficiently.

Creating Streams

Streams can be created from various data sources.

From Iterables

The simplest way to create a stream is from an array or any iterable:
import { Stream } from "effect"

// Stream.fromIterable turns any iterable into a stream
export const numbers = Stream.fromIterable<number>([1, 2, 3, 4, 5])

From Effects with Schedules

Create polling streams that emit values on a schedule:
import { Effect, Schedule, Stream } from "effect"

// Stream.fromEffectSchedule turns a single effect into a polling stream.
// This is useful for metrics, health checks, and cache refresh loops.
export const samples = Stream.fromEffectSchedule(
  Effect.succeed(3),
  Schedule.spaced("30 seconds")
).pipe(
  // Stream.take limits the number of elements emitted by the stream
  Stream.take(3)
)

From Paginated APIs

Use Stream.paginate for APIs that return data one page at a time:
import { Array, Effect, Stream } from "effect"
import * as Option from "effect/Option"

// Stream.paginate is perfect for APIs that return one page at a time
export const fetchJobsPage = Stream.paginate(
  0, // start with page 0 (the cursor)
  Effect.fn(function*(page) {
    // Simulate network latency
    yield* Effect.sleep("50 millis")
    
    const results = Array.range(0, 100).map((i) => 
      `Job ${i + 1 + page * 100}`
    )
    
    // Only return 10 pages of results
    const nextPage = page <= 10
      ? Option.some(page + 1)
      : Option.none()
    
    return [results, nextPage] as const
  })
)

From Async Iterables

Convert async iterables into streams:
import { Schema, Stream } from "effect"

class LetterError extends Schema.TaggedErrorClass<LetterError>()("LetterError", {
  cause: Schema.Defect
}) {}

async function* asyncIterable() {
  yield "a"
  yield "b"
  yield "c"
}

// Create a stream from an async iterable
export const letters = Stream.fromAsyncIterable(
  asyncIterable(),
  (cause) => new LetterError({ cause })
)

From Event Listeners

Create streams from DOM events:
import { Stream } from "effect"

const button = document.getElementById("my-button")!

// Stream.fromEventListener creates a stream from an event listener
export const events = Stream.fromEventListener<PointerEvent>(button, "click")

From Callbacks

For any callback-based API, use Stream.callback:
import { Effect, Queue, Stream } from "effect"

const button = document.getElementById("my-button")!

export const callbackStream = Stream.callback<PointerEvent>(Effect.fn(function*(queue) {
  // Use the Queue APIs to emit values into the stream
  function onEvent(event: PointerEvent) {
    Queue.offerUnsafe(queue, event)
  }
  
  // Register the event listener and add a finalizer to unregister it
  yield* Effect.acquireRelease(
    Effect.sync(() => button.addEventListener("click", onEvent)),
    () => Effect.sync(() => button.removeEventListener("click", onEvent))
  )
}))

From Node.js Streams

Convert Node.js readable streams:
import { NodeStream } from "@effect/platform-node"
import { Schema } from "effect"
import { Readable } from "node:stream"

export class NodeStreamError extends Schema.TaggedErrorClass<NodeStreamError>()("NodeStreamError", {
  cause: Schema.Defect
}) {}

// Create a stream from a Node.js readable stream
export const nodeStream = NodeStream.fromReadable({
  evaluate: () => Readable.from(["Hello", " ", "world", "!"])  ,
  onError: (cause) => new NodeStreamError({ cause }),
  closeOnDone: true // true by default
})

Transforming Streams

Streams provide rich operators for transformation.

Pure Transformations

Use Stream.map for pure per-element transforms:
import { Stream } from "effect"

interface Order {
  readonly id: string
  readonly subtotalCents: number
  readonly shippingCents: number
}

interface NormalizedOrder extends Order {
  readonly totalCents: number
}

const orderEvents = Stream.succeed<Order>({
  id: "ord_1001",
  customerId: "cus_1",
  status: "paid",
  subtotalCents: 4_500,
  shippingCents: 500,
  country: "US"
})

// Stream.map for pure per-element transforms
export const normalizedOrders = orderEvents.pipe(
  Stream.map((order): NormalizedOrder => ({
    ...order,
    totalCents: order.subtotalCents + order.shippingCents
  }))
)

Filtering

Exclude elements that don’t match a predicate:
// Stream.filter lets you exclude elements
export const paidOrders = normalizedOrders.pipe(
  Stream.filter((order) => order.status === "paid")
)

FlatMap

Transform each element into a stream and flatten the results:
import { Stream } from "effect"

// Stream.flatMap to transform each element into a stream
export const allOrders = Stream.make("US", "CA", "NZ").pipe(
  Stream.flatMap(
    (country) =>
      Stream.range(1, 50).pipe(
        Stream.map((i): Order => ({
          id: `ord_${country}_${i}`,
          customerId: `cus_${i}`,
          status: i % 10 === 0 ? "refunded" : "paid",
          subtotalCents: Math.round(Math.random() * 100_000),
          shippingCents: Math.round(Math.random() * 10_000),
          country
        }))
      ),
    // Control the concurrency of the flatMap
    { concurrency: 2 }
  )
)

Effectful Transformations

Use Stream.mapEffect for transformations that require effects:
import { Effect, Stream } from "effect"

const enrichOrder = Effect.fn(function*(order: NormalizedOrder): Effect.fn.Return<EnrichedOrder> {
  // Simulate effectful enrichment (tax/risk lookup)
  yield* Effect.sleep("5 millis")
  
  const taxRate = order.country === "US" ? 0.08 : 0.13
  const taxCents = Math.round(order.totalCents * taxRate)
  
  return {
    ...order,
    taxCents,
    grandTotalCents: order.totalCents + taxCents,
    priority: order.totalCents >= 20_000 ? "high" : "normal"
  }
})

// Stream.mapEffect performs effectful transforms with concurrency control
export const enrichedPaidOrders = paidOrders.pipe(
  Stream.mapEffect(enrichOrder, { concurrency: 4 })
)

Consuming Streams

Streams are consumed using various run* methods.

Collecting All Elements

Gather all stream outputs into an array:
import { Stream } from "effect"

// runCollect gathers all stream outputs into an immutable array
export const collectedOrders = Stream.runCollect(enrichedPaidOrders)

Running for Effects

Run the stream for its effects, ignoring outputs:
// runDrain runs the stream for its effects, ignoring all outputs
export const drained = Stream.runDrain(enrichedPaidOrders)

Processing Each Element

Execute an effectful consumer for every element:
import { Effect, Stream } from "effect"

// runForEach executes an effectful consumer for every element
export const logOrders = enrichedPaidOrders.pipe(
  Stream.runForEach((order) => 
    Effect.logInfo(`Order ${order.id} total=$${(order.grandTotalCents / 100).toFixed(2)}`)
  )
)

Folding/Reducing

Reduce the stream to one accumulated value:
// runFold reduces the stream to one accumulated value
export const totalRevenueCents = enrichedPaidOrders.pipe(
  Stream.runFold(() => 0, (acc: number, order) => acc + order.grandTotalCents)
)

Using Sinks

Consume streams through sinks:
import { Sink, Stream } from "effect"

// run lets you consume a stream through any Sink
export const totalRevenueViaSink = enrichedPaidOrders.pipe(
  Stream.map((order) => order.grandTotalCents),
  Stream.run(Sink.sum)
)

Getting First/Last Elements

Capture edge elements as Option values:
// runHead and runLast capture edge elements as Option values
export const firstLargeOrder = enrichedPaidOrders.pipe(
  Stream.filter((order) => order.priority === "high"),
  Stream.runHead
)

export const lastLargeOrder = enrichedPaidOrders.pipe(
  Stream.filter((order) => order.priority === "high"),
  Stream.runLast
)

Windowing and Limiting

Control how much of a stream is processed.

Taking Elements

import { Stream } from "effect"

// Take only the first N elements
export const firstTwoOrders = enrichedPaidOrders.pipe(
  Stream.take(2),
  Stream.runCollect
)

Dropping Elements

// Skip the first N elements
export const afterWarmupOrder = enrichedPaidOrders.pipe(
  Stream.drop(1),
  Stream.runCollect
)

Conditional Taking

// Take elements while a condition is true
export const untilLargeOrder = enrichedPaidOrders.pipe(
  Stream.takeWhile((order) => order.priority === "normal"),
  Stream.runCollect
)

Encoding and Decoding Streams

Use channels to decode and encode streams of structured data.
import { Stream } from "effect"
import { Ndjson, Msgpack } from "effect/unstable"

// Decode NDJSON stream
const ndjsonStream = someByteStream.pipe(
  Stream.pipeThroughChannel(Ndjson.decode)
)

// Encode to MessagePack
const msgpackStream = structuredDataStream.pipe(
  Stream.pipeThroughChannel(Msgpack.encode)
)

Stream Patterns

import { Stream } from "effect"

const stream1 = Stream.range(1, 5)
const stream2 = Stream.range(10, 15)

// Merge two streams into one
const merged = Stream.merge(stream1, stream2)

Error Handling in Streams

Streams propagate errors through the error channel:
import { Effect, Stream } from "effect"

const streamWithErrors = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.mapEffect((n) => 
    n === 3 
      ? Effect.fail(new Error("Invalid value"))
      : Effect.succeed(n * 2)
  ),
  // Catch and recover from errors
  Stream.catchAll((error) => Stream.succeed(-1))
)

Best Practices

Use Streams for Sequences

Prefer streams over arrays when working with large datasets, infinite sequences, or data that arrives over time.

Control Concurrency

Always specify concurrency limits with mapEffect and flatMap to prevent resource exhaustion.

Compose Operators

Build complex stream pipelines by composing simple operators. Keep individual transformations focused.

Handle Errors

Use Stream.catchAll, Stream.orElse, or Stream.retry to handle errors gracefully in stream pipelines.

Next Steps

Concurrency

Learn about concurrent operations and fiber management

Error Handling

Handle errors in streams and effects

Build docs developers (and LLMs) love