Sink<A, In, L, E, R> is used to consume elements produced by a Stream. Think of it as a function that consumes a variable amount of In elements, might fail with an error of type E, and eventually yields a value of type A together with leftovers of type L.
Type Parameters
A: The result type produced by the sinkIn: The type of elements consumedL: The type of leftover elements (defaults tonever)E: The type of errors (defaults tonever)R: The context type required (defaults tonever)
Basic Sinks
Collection Sinks
import { Sink, Stream } from "effect"
// Collect all elements into a Chunk
const all = Stream.range(1, 5).pipe(
Stream.run(Sink.collectAll())
)
// Collect first n elements
const first3 = Stream.range(1, 10).pipe(
Stream.run(Sink.collectAllN(3))
)
// Collect into a set
const uniqueValues = Stream.make(1, 2, 2, 3, 3).pipe(
Stream.run(Sink.collectAllToSet())
)
// Collect into a map with merge function
const grouped = Stream.make(
{ key: "a", value: 1 },
{ key: "a", value: 2 },
{ key: "b", value: 3 }
).pipe(
Stream.run(
Sink.collectAllToMap(
item => item.key,
(a, b) => ({ ...a, value: a.value + b.value })
)
)
)
Conditional Collection
import { Sink, Stream } from "effect"
// Collect until predicate is satisfied
const untilNegative = Stream.make(1, 2, 3, -1, 4, 5).pipe(
Stream.run(
Sink.collectAllUntil(n => n < 0)
)
)
// Collect while predicate holds
const positives = Stream.make(1, 2, 3, -1, 4, 5).pipe(
Stream.run(
Sink.collectAllWhile(n => n > 0)
)
)
// With effects
const conditional = Stream.range(1, 10).pipe(
Stream.run(
Sink.collectAllUntilEffect(n =>
Effect.succeed(n > 5)
)
)
)
Folding Sinks
Basic Folding
import { Sink, Stream } from "effect"
// Fold with initial value and function
const sum = Stream.range(1, 5).pipe(
Stream.run(
Sink.foldLeft(0, (acc, n) => acc + n)
)
)
// Fold with termination predicate
const foldUntil = Stream.range(1, 100).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum < 50, // continue while sum < 50
(acc, n) => acc + n
)
)
)
// Fold with effects
const effectfulFold = Stream.range(1, 5).pipe(
Stream.run(
Sink.foldLeftEffect(
0,
(acc, n) => Effect.succeed(acc + n)
)
)
)
Weighted Folding
import { Sink, Stream } from "effect"
// Fold until total cost exceeds max
const weighted = Stream.make("a", "bb", "ccc").pipe(
Stream.run(
Sink.foldWeighted({
initial: "",
maxCost: 5,
cost: (acc, s) => acc.length + s.length,
body: (acc, s) => acc + s
})
)
)
// With decomposition for large elements
const decomposing = Stream.make(1, 5, 1).pipe(
Stream.run(
Sink.foldWeightedDecompose({
initial: Chunk.empty<number>(),
maxCost: 4,
cost: (n) => n,
decompose: (n) => Chunk.make(n - 1, 1),
body: (acc, el) => Chunk.append(acc, el)
})
)
)
Fold Until
import { Sink, Stream } from "effect"
// Fold until max elements
const limitedFold = Stream.range(1, 100).pipe(
Stream.run(
Sink.foldUntil(
[] as number[],
10, // max 10 elements
(acc, n) => [...acc, n]
)
)
)
Aggregation Sinks
Statistical Sinks
import { Sink, Stream } from "effect"
// Count elements
const count = Stream.range(1, 10).pipe(
Stream.run(Sink.count)
)
// Sum numbers
const sum = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.run(Sink.sum)
)
// Find first/last element
const first = Stream.range(1, 10).pipe(
Stream.run(Sink.head())
) // Option<number>
const last = Stream.range(1, 10).pipe(
Stream.run(Sink.last())
) // Option<number>
Predicates
import { Sink, Stream } from "effect"
// Check if all elements satisfy predicate
const allPositive = Stream.make(1, 2, 3).pipe(
Stream.run(
Sink.every(n => n > 0)
)
)
// Check if any element satisfies predicate
const hasNegative = Stream.make(1, -2, 3).pipe(
Stream.run(
Sink.some(n => n < 0)
)
)
Input Transformation
Mapping Inputs
import { Sink, Stream } from "effect"
// Transform input elements
const doubled = Sink.sum.pipe(
Sink.mapInput((n: number) => n * 2)
)
Stream.make(1, 2, 3).pipe(
Stream.run(doubled)
) // 12
// Transform with effects
const effectfulMap = Sink.collectAll<number>().pipe(
Sink.mapInputEffect(n =>
Effect.succeed(n * 2)
)
)
Mapping Chunks
import { Chunk, Sink, Stream } from "effect"
// Transform input chunks
const chunkMapped = Sink.collectAll<number>().pipe(
Sink.mapInputChunks(
(chunk: Chunk.Chunk<number>) =>
Chunk.map(chunk, n => n * 2)
)
)
Filtering Inputs
import { Sink, Stream } from "effect"
// Filter input elements
const evensOnly = Sink.collectAll<number>().pipe(
Sink.filterInput(n => n % 2 === 0)
)
Stream.range(1, 10).pipe(
Stream.run(evensOnly)
) // Chunk(2, 4, 6, 8, 10)
Result Transformation
Mapping Results
import { Sink, Stream } from "effect"
// Transform sink result
const doubled = Sink.sum.pipe(
Sink.map(total => total * 2)
)
// Transform with effects
const logged = Sink.sum.pipe(
Sink.mapEffect(total =>
Console.log(`Total: ${total}`).pipe(
Effect.as(total)
)
)
)
Both Sides
import { Sink, Stream } from "effect"
// Transform both input and output
const transformed = Sink.sum.pipe(
Sink.dimap({
onInput: (s: string) => s.length,
onDone: (total) => `Total length: ${total}`
})
)
Combining Sinks
Sequential Composition
import { Sink, Stream } from "effect"
// Chain sinks sequentially
const first5ThenSum = Sink.collectAllN<number>(5).pipe(
Sink.flatMap(chunk =>
Sink.succeed(Chunk.reduce(chunk, 0, (a, b) => a + b))
)
)
Parallel Execution
import { Sink, Stream } from "effect"
// Run two sinks in parallel
const both = Sink.zip(
Sink.sum,
Sink.count
)
Stream.range(1, 5).pipe(
Stream.run(both)
) // [15, 5]
// Zip with custom function
const combined = Sink.zipWith(
Sink.sum,
Sink.count,
(sum, count) => sum / count
)
Racing
import { Sink, Stream } from "effect"
// Use result of first sink to complete
const raced = Sink.race(
Sink.head(),
Sink.take(5)
)
Dropping and Taking
import { Sink, Stream } from "effect"
// Drop first n elements
const skipFirst = Stream.range(1, 10).pipe(
Stream.run(Sink.drop(3))
)
// Drop while predicate holds
const dropWhile = Stream.range(1, 10).pipe(
Stream.run(
Sink.dropWhile(n => n < 5)
)
)
// Take first n elements
const takeFirst = Stream.range(1, 10).pipe(
Stream.run(
Sink.take(5)
)
)
Advanced Patterns
Leftovers
import { Sink, Stream } from "effect"
// Collect leftovers
const withLeftovers = Stream.range(1, 10).pipe(
Stream.run(
Sink.collectAllN(5).pipe(
Sink.collectLeftover
)
)
) // [[1,2,3,4,5], [6,7,8,9,10]]
// Ignore leftovers
const noLeftovers = Sink.take<number>(5).pipe(
Sink.ignoreLeftover
)
Splitting
import { Sink, Stream } from "effect"
// Split on predicate
const splitOn = Stream.range(1, 10).pipe(
Stream.run(
Sink.collectAll<number>().pipe(
Sink.splitWhere(n => n === 5)
)
)
)
Loop and Repeat
import { Sink, Stream } from "effect"
// Repeatedly run sink
const repeated = Stream.range(1, 20).pipe(
Stream.run(
Sink.collectAllFrom(
Sink.take<number>(5)
)
)
) // Chunk of chunks
// Collect while condition holds
const conditional = Sink.collectAllWhileWith({
initial: 0,
while: (sum) => sum < 50,
body: (sum, n) => sum + n
}).pipe(
Sink.flatMap(() => Sink.sum)
)
Integration with Streams
From Queues and PubSubs
import { Effect, PubSub, Queue, Sink } from "effect"
// Sink to queue
const queueSink = Effect.gen(function*() {
const queue = yield* Queue.unbounded<number>()
return Sink.fromQueue(queue)
})
// Sink to pubsub
const pubsubSink = Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<number>()
return Sink.fromPubSub(pubsub)
})
Custom Sinks
import { Chunk, Effect, Option, Sink } from "effect"
// From a push function
const customSink = Sink.fromPush<number, string, never, number, never>(
Effect.sync(() => {
let sum = 0
return (chunk: Option.Option<Chunk.Chunk<number>>) =>
Effect.sync(() => {
if (Option.isNone(chunk)) {
return Either.right([sum.toString(), Chunk.empty()])
}
Chunk.forEach(chunk.value, n => {
sum += n
})
})
})
)
Error Handling
import { Sink, Stream } from "effect"
// Handle sink failures
const recovered = Sink.sum.pipe(
Sink.orElse(() => Sink.succeed(0))
)
// Map errors
const mappedError = someSink.pipe(
Sink.mapError(error => `Failed: ${error}`)
)
// Fold over result
const folded = someSink.pipe(
Sink.foldSink({
onFailure: (error) => Sink.succeed(-1),
onSuccess: (value) => Sink.succeed(value)
})
)
Resource Management
import { Effect, Sink } from "effect"
// Sink with finalizer
const withCleanup = Sink.collectAll<number>().pipe(
Sink.ensuring(
Console.log("Cleaning up")
)
)
// Context-dependent finalizer
const withExitHandler = Sink.collectAll<number>().pipe(
Sink.ensuringWith((exit) =>
Exit.isSuccess(exit)
? Console.log("Success")
: Console.log("Failure")
)
)
Common Patterns
Batching
import { Sink, Stream } from "effect"
const batched = Stream.range(1, 100).pipe(
Stream.transduce(
Sink.collectAllN(10)
)
) // Stream of chunks of 10
Aggregation Window
import { Sink, Stream } from "effect"
const windowed = Stream.range(1, 100).pipe(
Stream.aggregate(
Sink.foldWeighted({
initial: 0,
maxCost: 100,
cost: (_, n) => n,
body: (acc, n) => acc + n
})
)
)