Skip to main content

Overview

ZSink[R, E, In, L, Z] is a consumer of elements from a stream. Sinks consume elements of type In and produce a final result of type Z, potentially failing with an error of type E. Leftover elements of type L are produced when the sink completes. Sinks are:
  • Compositional: Combine sinks to build complex consumption logic
  • Resource-safe: Automatic cleanup of resources
  • Type-safe: Compile-time guarantees about input/output types
  • Efficient: Optimized for streaming performance

Type Parameters

R
type
Environment required to run the sink
E
type
Error type the sink may fail with
In
type
Type of elements consumed by the sink
L
type
Type of leftover elements
Z
type
Type of the sink’s result value

Creating Sinks

Collection Sinks

import zio._
import zio.stream._

// Collect all elements
val collectAll: ZSink[Any, Nothing, Int, Nothing, Chunk[Int]] = 
  ZSink.collectAll

// Collect first n elements
val collectN: ZSink[Any, Nothing, String, String, Chunk[String]] = 
  ZSink.collectAllN(10)

// Collect into a set
val toSet: ZSink[Any, Nothing, Int, Nothing, Set[Int]] = 
  ZSink.collectAllToSet

// Collect into a map
val toMap: ZSink[Any, Nothing, (String, Int), Nothing, Map[String, Int]] = 
  ZSink.collectAllToMap(_._1)((a, b) => a + b)

Folding Sinks

// Fold with initial state
val sum: ZSink[Any, Nothing, Int, Nothing, Int] = 
  ZSink.foldLeft(0)(_ + _)

// Fold until condition
val sumUntil: ZSink[Any, Nothing, Int, Int, Int] = 
  ZSink.fold(0)(_ < 100)(_ + _)

// Effectful fold
val effectfulSum: ZSink[Any, IOException, Int, Nothing, Int] = 
  ZSink.foldLeftZIO(0) { (acc, n) =>
    Console.printLine(s"Current sum: ${acc + n}").as(acc + n)
  }

Aggregation Sinks

// Count elements
val count: ZSink[Any, Nothing, Any, Nothing, Long] = 
  ZSink.count

// Sum numeric elements
val sum: ZSink[Any, Nothing, Int, Nothing, Int] = 
  ZSink.sum

// Find first element matching predicate
val find: ZSink[Any, Nothing, Int, Int, Option[Int]] = 
  ZSink.fold[Int, Option[Int]](None)(_.isEmpty) {
    case (None, n) if n > 10 => Some(n)
    case (acc, _) => acc
  }

Using Sinks with Streams

Running Streams to Sinks

val stream = ZStream(1, 2, 3, 4, 5)

// Using run method
val result1: ZIO[Any, Nothing, Chunk[Int]] = 
  stream.run(ZSink.collectAll)

// Using >>> operator
val result2: ZIO[Any, Nothing, Int] = 
  stream >>> ZSink.sum

// Multiple sinks
val (sum, count) = stream.run(
  ZSink.sum <*> ZSink.count
)

Transforming Sinks

Mapping Results

val collectSink = ZSink.collectAll[Int]

// Map the result
val countSink: ZSink[Any, Nothing, Int, Nothing, Int] = 
  collectSink.map(_.size)

// Map with effect
val logged: ZSink[Any, IOException, Int, Nothing, Chunk[Int]] = 
  collectSink.mapZIO { chunk =>
    Console.printLine(s"Collected ${chunk.size} elements").as(chunk)
  }

Contramap (Transform Input)

val intSum: ZSink[Any, Nothing, Int, Nothing, Int] = 
  ZSink.sum

// Transform input before summing
val stringLengthSum: ZSink[Any, Nothing, String, Nothing, Int] = 
  intSum.contramap[String](_.length)

// Stream("hello", "world") >>> stringLengthSum = 10

Combining Sinks

Sequential Combination

val sink1: ZSink[Any, Nothing, Int, Int, Int] = 
  ZSink.collectAllN(5).map(_.sum)

val sink2: ZSink[Any, Nothing, Int, Nothing, Int] = 
  ZSink.sum

// Run sink1, then sink2 on leftovers
val combined = sink1.flatMap { firstSum =>
  sink2.map { secondSum =>
    (firstSum, secondSum)
  }
}

Parallel Combination

// Run two sinks in parallel
val parallel = ZSink.collectAll <&> ZSink.count
// Produces (Chunk[A], Long)

// Zip sinks
val zipped = ZSink.sum <*> ZSink.count
// Produces (Int, Long)

Racing Sinks

// Race two sinks, take result from first to complete
val raced = ZSink.head | ZSink.last

Weighted Folding

// Fold until total weight exceeds threshold
val weightedSink = ZSink.foldWeighted(Chunk.empty[String])(
  // Cost function: string length
  (acc, s: String) => s.length.toLong,
  // Max total weight: 1000 characters
  max = 1000
)(
  // Combining function
  (acc, s) => acc :+ s
)

// With decomposition for large elements
val decomposingSink = ZSink.foldWeightedDecompose(Chunk.empty[String])(
  (acc, s: String) => s.length.toLong,
  max = 100,
  // Break down large strings
  decompose = s => 
    if (s.length > 100) 
      Chunk.fromIterable(s.grouped(50))
    else 
      Chunk.single(s)
)((acc, s) => acc :+ s)

Conditional Sinks

// Collect until predicate
val untilBlank: ZSink[Any, Nothing, String, String, Chunk[String]] = 
  ZSink.collectAllUntil[String](_.isEmpty)

// Collect while predicate
val whileNonEmpty: ZSink[Any, Nothing, String, String, Chunk[String]] = 
  ZSink.collectAllWhile[String](_.nonEmpty)

// With effectful predicates
val untilValid: ZSink[Any, IOException, String, String, Chunk[String]] = 
  ZSink.collectAllUntilZIO { s =>
    validate(s).map(_ == ValidationResult.Stop)
  }

Splitting Sinks

// Split on predicate
val splitSink: ZSink[Any, Nothing, Int, Int, Chunk[Int]] = 
  ZSink.collectAll[Int].splitWhere(_ == 0)

// Example: ZStream(1,2,0,3,4) >>> splitSink
// First run: Chunk(1,2), leftover: Chunk(0,3,4)

Common Patterns

Batching to Database

def batchInsert[A](batchSize: Int): ZSink[Any, Throwable, A, Nothing, Unit] = 
  ZSink.collectAllN[A](batchSize).mapZIO { batch =>
    database.insertBatch(batch)
  }.ignoreLeftover

// Usage
stream.run(batchInsert(100))

Accumulate and Process

def processInBatches[A, B](
  batchSize: Int,
  process: Chunk[A] => ZIO[Any, Throwable, B]
): ZSink[Any, Throwable, A, Nothing, Chunk[B]] = {
  ZSink.foldChunksZIO(Chunk.empty[B])(_.size < 100) { (results, chunk) =>
    if (chunk.size >= batchSize) {
      process(chunk.take(batchSize)).map(results :+ _)
    } else {
      ZIO.succeed(results)
    }
  }
}

File Writing

import java.nio.file.{Files, Path, StandardOpenOption}

def writeToFile(path: Path): ZSink[Any, Throwable, Byte, Nothing, Long] = 
  ZSink.foldLeftZIO(0L) { (count, byte: Byte) =>
    ZIO.attempt {
      Files.write(path, Array(byte), StandardOpenOption.APPEND)
      count + 1
    }
  }

Statistics Collector

case class Stats(count: Long, sum: Double, min: Double, max: Double)

def statsCollector: ZSink[Any, Nothing, Double, Nothing, Stats] = 
  ZSink.foldLeft(Stats(0, 0.0, Double.MaxValue, Double.MinValue)) {
    (stats, value) =>
      Stats(
        count = stats.count + 1,
        sum = stats.sum + value,
        min = math.min(stats.min, value),
        max = math.max(stats.max, value)
      )
  }

Resource Management

// Sink with resource cleanup
def writingToResource[A](
  open: ZIO[Scope, Throwable, Writer],
  write: (Writer, A) => ZIO[Any, Throwable, Unit]
): ZSink[Any, Throwable, A, Nothing, Unit] = {
  ZSink.unwrapScoped {
    open.map { writer =>
      ZSink.foreach[Any, Throwable, A](a => write(writer, a))
    }
  }
}

Performance Tips

Use foldLeftChunks instead of foldLeft when you can process chunks efficiently:
// More efficient
ZSink.foldLeftChunks(0)((sum, chunk: Chunk[Int]) => sum + chunk.sum)

// Less efficient
ZSink.foldLeft(0)(_ + _)

Common Sinks Reference

collectAll

Collect all elements into a Chunk

collectAllN

Collect first n elements

collectAllToSet

Collect into a Set

collectAllToMap

Collect into a Map

count

Count all elements

sum

Sum numeric elements

foldLeft

Fold elements left-to-right

foreach

Run effect for each element

See Also

ZStream

Produce stream elements

ZPipeline

Transform stream elements

ZChannel

Low-level streaming primitive

Build docs developers (and LLMs) love