Skip to main content
Process data streams efficiently with Effect’s Stream module for real-time data, file processing, and data pipelines.

Prerequisites

npm install effect @effect/platform @effect/platform-node

Step 1: Creating Streams

Create streams from various sources:
create-streams.ts
import { Stream, Effect, Schedule } from "effect"

// From values
const streamFromValues = Stream.make(1, 2, 3, 4, 5)

// From an array
const streamFromArray = Stream.fromIterable([1, 2, 3, 4, 5])

// From a range
const streamRange = Stream.range(0, 10)

// From an Effect
const streamFromEffect = Stream.fromEffect(
  Effect.succeed("Hello")
)

// Infinite stream with schedule
const periodicStream = Stream.fromSchedule(
  Schedule.spaced("1 second")
)

// Repeat a value
const repeatedStream = Stream.repeatValue(42)

// Run streams
Stream.runCollect(streamFromValues).pipe(
  Effect.map((chunk) => console.log(chunk)),
  Effect.runPromise
)

Step 2: Transforming Streams

Transform stream data with map, filter, and flatMap:
transform-streams.ts
import { Stream, Effect, Chunk } from "effect"

const numbers = Stream.range(1, 10)

// Map values
const doubled = Stream.map(numbers, (n) => n * 2)

// Filter values
const evens = Stream.filter(numbers, (n) => n % 2 === 0)

// FlatMap for nested streams
const nested = Stream.flatMap(
  Stream.make(1, 2, 3),
  (n) => Stream.make(n, n * 10, n * 100)
)

// Map with effects
const withEffects = Stream.mapEffect(
  numbers,
  (n) => Effect.gen(function*() {
    yield* Effect.sleep("100 millis")
    return n * 2
  })
)

// Transform chunks
const chunked = Stream.mapChunks(
  numbers,
  (chunk) => Chunk.map(chunk, (n) => n.toString())
)

// Run and collect
Stream.runCollect(doubled).pipe(
  Effect.map((chunk) => console.log("Doubled:", chunk)),
  Effect.runPromise
)

Step 3: Stream Consumption

Consume streams with various sinks:
consume-streams.ts
import { Stream, Effect } from "effect"

const numbers = Stream.range(1, 100)

// Collect all values
const collected = Stream.runCollect(numbers)

// Fold/reduce stream
const sum = Stream.runFold(
  numbers,
  0,
  (acc, n) => acc + n
)

// Process each element
const logged = Stream.runForEach(
  numbers,
  (n) => Effect.log(`Processing: ${n}`)
)

// Take first N elements
const first5 = Stream.take(numbers, 5).pipe(
  Stream.runCollect
)

// Take while condition is true
const lessThan50 = Stream.takeWhile(
  numbers,
  (n) => n < 50
).pipe(
  Stream.runCollect
)

// Drain stream (run for side effects)
const drained = Stream.runDrain(
  Stream.mapEffect(numbers, (n) => Console.log(n))
)

// Get last element
const last = Stream.runLast(numbers)

sum.pipe(
  Effect.map((total) => console.log(`Sum: ${total}`)),
  Effect.runPromise
)

Step 4: File Processing

Process files efficiently with streams:
file-streams.ts
import { FileSystem } from "@effect/platform"
import { NodeFileSystem, NodeRuntime } from "@effect/platform-node"
import { Effect, Stream } from "effect"

const processFile = Effect.gen(function*() {
  const fs = yield* FileSystem.FileSystem
  
  // Read file as stream
  const fileStream = fs.stream("large-file.txt")
  
  // Process line by line
  const lineCount = yield* fileStream.pipe(
    Stream.decodeText(),
    Stream.splitLines,
    Stream.runFold(0, (count) => count + 1)
  )
  
  console.log(`Lines: ${lineCount}`)
  
  // Filter and transform
  yield* fs.stream("input.txt").pipe(
    Stream.decodeText(),
    Stream.splitLines,
    Stream.filter((line) => line.startsWith("ERROR")),
    Stream.map((line) => `[FILTERED] ${line}\n`),
    Stream.encodeText,
    Stream.run(fs.sink("errors.txt"))
  )
})

processFile.pipe(
  Effect.provide(NodeFileSystem.layer),
  NodeRuntime.runMain
)

Step 5: Watching File Changes

Stream file system changes in real-time:
watch-files.ts
import { FileSystem } from "@effect/platform"
import { NodeFileSystem, NodeRuntime } from "@effect/platform-node"
import * as ParcelWatcher from "@effect/platform-node/NodeFileSystem/ParcelWatcher"
import { Console, Effect, Layer, Stream } from "effect"

const EnvLive = NodeFileSystem.layer.pipe(
  Layer.provide(ParcelWatcher.layer)
)

Effect.gen(function*() {
  const fs = yield* FileSystem.FileSystem
  
  yield* fs.watch("src", { recursive: true }).pipe(
    Stream.map((event) => `${event.type}: ${event.path}`),
    Stream.runForEach(Console.log)
  )
}).pipe(
  Effect.provide(EnvLive),
  NodeRuntime.runMain
)

Step 6: Database Streaming

Stream large result sets from databases:
stream-database.ts
import { ClickhouseClient } from "@effect/sql-clickhouse"
import { Config, Effect, Stream } from "effect"

const program = Effect.gen(function*() {
  const sql = yield* ClickhouseClient.ClickhouseClient
  
  // Stream query results
  yield* sql`SELECT * FROM large_table ORDER BY id`.stream.pipe(
    Stream.map((row) => `Processing: ${row.id}`),
    Stream.runForEach(Effect.log),
    sql.withQueryId("stream-query")
  )
  
  // Process in chunks for batching
  yield* sql`SELECT * FROM users`.stream.pipe(
    Stream.grouped(100), // Process 100 at a time
    Stream.mapEffect((chunk) =>
      Effect.gen(function*() {
        yield* Effect.log(`Processing batch of ${chunk.length}`)
        // Batch processing logic here
      })
    ),
    Stream.runDrain
  )
})

const ClickhouseLive = ClickhouseClient.layerConfig({
  url: Config.succeed("https://localhost:8443"),
  username: Config.succeed("default"),
  password: Config.string("CLICKHOUSE_PASSWORD")
})

program.pipe(
  Effect.provide(ClickhouseLive),
  Effect.runPromise
)

Step 7: Combining Streams

Merge, zip, and combine multiple streams:
combine-streams.ts
import { Stream, Effect } from "effect"

const stream1 = Stream.make(1, 2, 3)
const stream2 = Stream.make("a", "b", "c")
const stream3 = Stream.make(true, false)

// Concatenate streams
const concatenated = Stream.concat(stream1, stream1)

// Merge streams (interleave)
const merged = Stream.merge(stream1, stream1)

// Zip streams together
const zipped = Stream.zip(stream1, stream2)

// Zip with function
const combined = Stream.zipWith(
  stream1,
  stream2,
  (n, s) => `${n}:${s}`
)

// Interleave streams
const interleaved = Stream.interleave(stream1, stream1)

// Run in parallel
const parallel = Stream.mergeAll(
  [
    Stream.make(1, 2, 3),
    Stream.make(4, 5, 6),
    Stream.make(7, 8, 9)
  ],
  { concurrency: 3 }
)

Stream.runCollect(zipped).pipe(
  Effect.map((chunk) => console.log("Zipped:", chunk)),
  Effect.runPromise
)

Step 8: Buffering and Throttling

Control stream flow with buffers and throttles:
flow-control.ts
import { Stream, Effect, Schedule } from "effect"

const fastStream = Stream.range(1, 1000)

// Buffer elements
const buffered = Stream.buffer(fastStream, { capacity: 10 })

// Throttle stream
const throttled = Stream.throttleShape(
  fastStream,
  10, // Elements
  "1 second" // Duration
)

// Debounce stream
const debounced = Stream.debounce(
  fastStream,
  "100 millis"
)

// Group into chunks
const chunked = Stream.grouped(fastStream, 5)

// Group within time window
const windows = Stream.groupedWithin(
  fastStream,
  10, // Max size
  "1 second" // Max duration
)

// Rate limiting
const rateLimited = Stream.mapEffect(
  fastStream,
  (n) => Effect.sleep("100 millis").pipe(
    Effect.as(n)
  )
)

Step 9: Error Handling in Streams

Handle errors gracefully in streams:
stream-errors.ts
import { Stream, Effect } from "effect"

class ProcessingError {
  readonly _tag = "ProcessingError"
  constructor(readonly value: number) {}
}

const flakyStream = Stream.mapEffect(
  Stream.range(1, 10),
  (n) => {
    if (n === 5) {
      return Effect.fail(new ProcessingError(n))
    }
    return Effect.succeed(n)
  }
)

// Catch and continue
const withFallback = Stream.catchAll(
  flakyStream,
  (error) => Stream.make(-1) // Fallback stream
)

// Retry failed elements
const withRetry = Stream.mapEffect(
  Stream.range(1, 10),
  (n) => Effect.succeed(n).pipe(
    Effect.retry({ times: 3 })
  )
)

// Filter out errors
const filtered = Stream.mapEffect(
  Stream.range(1, 10),
  (n) => {
    if (n === 5) return Effect.fail(new ProcessingError(n))
    return Effect.succeed(n)
  }
).pipe(
  Stream.catchAll(() => Stream.empty)
)

Step 10: WebSocket Streams

Handle real-time WebSocket data:
websocket-stream.ts
import {
  HttpRouter,
  HttpServer,
  HttpServerRequest,
  HttpServerResponse
} from "@effect/platform"
import { NodeHttpServer, NodeRuntime } from "@effect/platform-node"
import { Effect, Layer, Schedule, Stream } from "effect"
import { createServer } from "node:http"

const ServerLive = NodeHttpServer.layer(() => createServer(), { port: 3000 })

const HttpLive = HttpRouter.empty.pipe(
  HttpRouter.get(
    "/ws",
    // Send periodic messages to client
    Stream.fromSchedule(Schedule.spaced("1 second")).pipe(
      Stream.map((count) => ({ type: "tick", count })),
      Stream.map(JSON.stringify),
      Stream.encodeText,
      // Upgrade to WebSocket
      Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()),
      // Receive messages from client
      Stream.decodeText(),
      Stream.runForEach((msg) => Effect.log(`Received: ${msg}`)),
      Effect.annotateLogs("ws", "server"),
      Effect.as(HttpServerResponse.empty())
    )
  ),
  HttpServer.serve(),
  Layer.provide(ServerLive)
)

NodeRuntime.runMain(Layer.launch(HttpLive))

Step 11: Stream Testing

Test stream behavior:
stream-testing.ts
import { Stream, Effect, Chunk } from "effect"
import { describe, it, assert } from "@effect/vitest"

describe("Stream processing", () => {
  it.effect("should transform stream correctly", () =>
    Effect.gen(function*() {
      const result = yield* Stream.make(1, 2, 3).pipe(
        Stream.map((n) => n * 2),
        Stream.runCollect
      )
      
      assert.deepStrictEqual(
        Chunk.toReadonlyArray(result),
        [2, 4, 6]
      )
    })
  )
  
  it.effect("should handle errors in streams", () =>
    Effect.gen(function*() {
      const result = yield* Stream.make(1, 2, 3).pipe(
        Stream.mapEffect((n) => 
          n === 2 ? Effect.fail("error") : Effect.succeed(n)
        ),
        Stream.catchAll(() => Stream.make(-1)),
        Stream.runCollect
      )
      
      assert.deepStrictEqual(
        Chunk.toReadonlyArray(result),
        [-1]
      )
    })
  )
})

Performance Tips

  1. Use chunks for batching: Process multiple elements at once
  2. Add buffering: Prevent backpressure in slow consumers
  3. Stream large files: Don’t load entire files into memory
  4. Use parallel processing: Process independent streams concurrently
  5. Apply throttling: Control resource usage

Next Steps

Building HTTP Server

Use streams in HTTP servers

Database Integration

Stream database query results

Error Handling Patterns

Handle errors in streams

Stream API Reference

Full Stream API documentation

Build docs developers (and LLMs) love