Skip to main content
A Sink<A, In, L, E, R> is used to consume elements produced by a Stream. Think of it as a function that consumes a variable amount of In elements, might fail with an error of type E, and eventually yields a value of type A together with leftovers of type L.

Type Parameters

  • A: The result type produced by the sink
  • In: The type of elements consumed
  • L: The type of leftover elements (defaults to never)
  • E: The type of errors (defaults to never)
  • R: The context type required (defaults to never)

Basic Sinks

Collection Sinks

import { Sink, Stream } from "effect"

// Collect all elements into a Chunk
const all = Stream.range(1, 5).pipe(
  Stream.run(Sink.collectAll())
)

// Collect first n elements
const first3 = Stream.range(1, 10).pipe(
  Stream.run(Sink.collectAllN(3))
)

// Collect into a set
const uniqueValues = Stream.make(1, 2, 2, 3, 3).pipe(
  Stream.run(Sink.collectAllToSet())
)

// Collect into a map with merge function
const grouped = Stream.make(
  { key: "a", value: 1 },
  { key: "a", value: 2 },
  { key: "b", value: 3 }
).pipe(
  Stream.run(
    Sink.collectAllToMap(
      item => item.key,
      (a, b) => ({ ...a, value: a.value + b.value })
    )
  )
)

Conditional Collection

import { Sink, Stream } from "effect"

// Collect until predicate is satisfied
const untilNegative = Stream.make(1, 2, 3, -1, 4, 5).pipe(
  Stream.run(
    Sink.collectAllUntil(n => n < 0)
  )
)

// Collect while predicate holds
const positives = Stream.make(1, 2, 3, -1, 4, 5).pipe(
  Stream.run(
    Sink.collectAllWhile(n => n > 0)
  )
)

// With effects
const conditional = Stream.range(1, 10).pipe(
  Stream.run(
    Sink.collectAllUntilEffect(n =>
      Effect.succeed(n > 5)
    )
  )
)

Folding Sinks

Basic Folding

import { Sink, Stream } from "effect"

// Fold with initial value and function
const sum = Stream.range(1, 5).pipe(
  Stream.run(
    Sink.foldLeft(0, (acc, n) => acc + n)
  )
)

// Fold with termination predicate
const foldUntil = Stream.range(1, 100).pipe(
  Stream.run(
    Sink.fold(
      0,
      (sum) => sum < 50, // continue while sum < 50
      (acc, n) => acc + n
    )
  )
)

// Fold with effects
const effectfulFold = Stream.range(1, 5).pipe(
  Stream.run(
    Sink.foldLeftEffect(
      0,
      (acc, n) => Effect.succeed(acc + n)
    )
  )
)

Weighted Folding

import { Sink, Stream } from "effect"

// Fold until total cost exceeds max
const weighted = Stream.make("a", "bb", "ccc").pipe(
  Stream.run(
    Sink.foldWeighted({
      initial: "",
      maxCost: 5,
      cost: (acc, s) => acc.length + s.length,
      body: (acc, s) => acc + s
    })
  )
)

// With decomposition for large elements
const decomposing = Stream.make(1, 5, 1).pipe(
  Stream.run(
    Sink.foldWeightedDecompose({
      initial: Chunk.empty<number>(),
      maxCost: 4,
      cost: (n) => n,
      decompose: (n) => Chunk.make(n - 1, 1),
      body: (acc, el) => Chunk.append(acc, el)
    })
  )
)

Fold Until

import { Sink, Stream } from "effect"

// Fold until max elements
const limitedFold = Stream.range(1, 100).pipe(
  Stream.run(
    Sink.foldUntil(
      [] as number[],
      10, // max 10 elements
      (acc, n) => [...acc, n]
    )
  )
)

Aggregation Sinks

Statistical Sinks

import { Sink, Stream } from "effect"

// Count elements
const count = Stream.range(1, 10).pipe(
  Stream.run(Sink.count)
)

// Sum numbers
const sum = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.run(Sink.sum)
)

// Find first/last element
const first = Stream.range(1, 10).pipe(
  Stream.run(Sink.head())
) // Option<number>

const last = Stream.range(1, 10).pipe(
  Stream.run(Sink.last())
) // Option<number>

Predicates

import { Sink, Stream } from "effect"

// Check if all elements satisfy predicate
const allPositive = Stream.make(1, 2, 3).pipe(
  Stream.run(
    Sink.every(n => n > 0)
  )
)

// Check if any element satisfies predicate
const hasNegative = Stream.make(1, -2, 3).pipe(
  Stream.run(
    Sink.some(n => n < 0)
  )
)

Input Transformation

Mapping Inputs

import { Sink, Stream } from "effect"

// Transform input elements
const doubled = Sink.sum.pipe(
  Sink.mapInput((n: number) => n * 2)
)

Stream.make(1, 2, 3).pipe(
  Stream.run(doubled)
) // 12

// Transform with effects
const effectfulMap = Sink.collectAll<number>().pipe(
  Sink.mapInputEffect(n =>
    Effect.succeed(n * 2)
  )
)

Mapping Chunks

import { Chunk, Sink, Stream } from "effect"

// Transform input chunks
const chunkMapped = Sink.collectAll<number>().pipe(
  Sink.mapInputChunks(
    (chunk: Chunk.Chunk<number>) =>
      Chunk.map(chunk, n => n * 2)
  )
)

Filtering Inputs

import { Sink, Stream } from "effect"

// Filter input elements
const evensOnly = Sink.collectAll<number>().pipe(
  Sink.filterInput(n => n % 2 === 0)
)

Stream.range(1, 10).pipe(
  Stream.run(evensOnly)
) // Chunk(2, 4, 6, 8, 10)

Result Transformation

Mapping Results

import { Sink, Stream } from "effect"

// Transform sink result
const doubled = Sink.sum.pipe(
  Sink.map(total => total * 2)
)

// Transform with effects
const logged = Sink.sum.pipe(
  Sink.mapEffect(total =>
    Console.log(`Total: ${total}`).pipe(
      Effect.as(total)
    )
  )
)

Both Sides

import { Sink, Stream } from "effect"

// Transform both input and output
const transformed = Sink.sum.pipe(
  Sink.dimap({
    onInput: (s: string) => s.length,
    onDone: (total) => `Total length: ${total}`
  })
)

Combining Sinks

Sequential Composition

import { Sink, Stream } from "effect"

// Chain sinks sequentially
const first5ThenSum = Sink.collectAllN<number>(5).pipe(
  Sink.flatMap(chunk =>
    Sink.succeed(Chunk.reduce(chunk, 0, (a, b) => a + b))
  )
)

Parallel Execution

import { Sink, Stream } from "effect"

// Run two sinks in parallel
const both = Sink.zip(
  Sink.sum,
  Sink.count
)

Stream.range(1, 5).pipe(
  Stream.run(both)
) // [15, 5]

// Zip with custom function
const combined = Sink.zipWith(
  Sink.sum,
  Sink.count,
  (sum, count) => sum / count
)

Racing

import { Sink, Stream } from "effect"

// Use result of first sink to complete
const raced = Sink.race(
  Sink.head(),
  Sink.take(5)
)

Dropping and Taking

import { Sink, Stream } from "effect"

// Drop first n elements
const skipFirst = Stream.range(1, 10).pipe(
  Stream.run(Sink.drop(3))
)

// Drop while predicate holds
const dropWhile = Stream.range(1, 10).pipe(
  Stream.run(
    Sink.dropWhile(n => n < 5)
  )
)

// Take first n elements
const takeFirst = Stream.range(1, 10).pipe(
  Stream.run(
    Sink.take(5)
  )
)

Advanced Patterns

Leftovers

import { Sink, Stream } from "effect"

// Collect leftovers
const withLeftovers = Stream.range(1, 10).pipe(
  Stream.run(
    Sink.collectAllN(5).pipe(
      Sink.collectLeftover
    )
  )
) // [[1,2,3,4,5], [6,7,8,9,10]]

// Ignore leftovers
const noLeftovers = Sink.take<number>(5).pipe(
  Sink.ignoreLeftover
)

Splitting

import { Sink, Stream } from "effect"

// Split on predicate
const splitOn = Stream.range(1, 10).pipe(
  Stream.run(
    Sink.collectAll<number>().pipe(
      Sink.splitWhere(n => n === 5)
    )
  )
)

Loop and Repeat

import { Sink, Stream } from "effect"

// Repeatedly run sink
const repeated = Stream.range(1, 20).pipe(
  Stream.run(
    Sink.collectAllFrom(
      Sink.take<number>(5)
    )
  )
) // Chunk of chunks

// Collect while condition holds
const conditional = Sink.collectAllWhileWith({
  initial: 0,
  while: (sum) => sum < 50,
  body: (sum, n) => sum + n
}).pipe(
  Sink.flatMap(() => Sink.sum)
)

Integration with Streams

From Queues and PubSubs

import { Effect, PubSub, Queue, Sink } from "effect"

// Sink to queue
const queueSink = Effect.gen(function*() {
  const queue = yield* Queue.unbounded<number>()
  return Sink.fromQueue(queue)
})

// Sink to pubsub
const pubsubSink = Effect.gen(function*() {
  const pubsub = yield* PubSub.unbounded<number>()
  return Sink.fromPubSub(pubsub)
})

Custom Sinks

import { Chunk, Effect, Option, Sink } from "effect"

// From a push function
const customSink = Sink.fromPush<number, string, never, number, never>(
  Effect.sync(() => {
    let sum = 0
    
    return (chunk: Option.Option<Chunk.Chunk<number>>) =>
      Effect.sync(() => {
        if (Option.isNone(chunk)) {
          return Either.right([sum.toString(), Chunk.empty()])
        }
        
        Chunk.forEach(chunk.value, n => {
          sum += n
        })
      })
  })
)

Error Handling

import { Sink, Stream } from "effect"

// Handle sink failures
const recovered = Sink.sum.pipe(
  Sink.orElse(() => Sink.succeed(0))
)

// Map errors
const mappedError = someSink.pipe(
  Sink.mapError(error => `Failed: ${error}`)
)

// Fold over result
const folded = someSink.pipe(
  Sink.foldSink({
    onFailure: (error) => Sink.succeed(-1),
    onSuccess: (value) => Sink.succeed(value)
  })
)

Resource Management

import { Effect, Sink } from "effect"

// Sink with finalizer
const withCleanup = Sink.collectAll<number>().pipe(
  Sink.ensuring(
    Console.log("Cleaning up")
  )
)

// Context-dependent finalizer
const withExitHandler = Sink.collectAll<number>().pipe(
  Sink.ensuringWith((exit) =>
    Exit.isSuccess(exit)
      ? Console.log("Success")
      : Console.log("Failure")
  )
)

Common Patterns

Batching

import { Sink, Stream } from "effect"

const batched = Stream.range(1, 100).pipe(
  Stream.transduce(
    Sink.collectAllN(10)
  )
) // Stream of chunks of 10

Aggregation Window

import { Sink, Stream } from "effect"

const windowed = Stream.range(1, 100).pipe(
  Stream.aggregate(
    Sink.foldWeighted({
      initial: 0,
      maxCost: 100,
      cost: (_, n) => n,
      body: (acc, n) => acc + n
    })
  )
)

See Also

Build docs developers (and LLMs) love