Overview
ZSink[R, E, In, L, Z] is a consumer of elements from a stream. Sinks consume elements of type In and produce a final result of type Z, potentially failing with an error of type E. Leftover elements of type L are produced when the sink completes.
Sinks are:
Compositional : Combine sinks to build complex consumption logic
Resource-safe : Automatic cleanup of resources
Type-safe : Compile-time guarantees about input/output types
Efficient : Optimized for streaming performance
Type Parameters
Environment required to run the sink
Error type the sink may fail with
Type of elements consumed by the sink
Type of leftover elements
Type of the sink’s result value
Creating Sinks
Collection Sinks
import zio . _
import zio . stream . _
// Collect all elements
val collectAll : ZSink [ Any , Nothing , Int , Nothing , Chunk [ Int ]] =
ZSink .collectAll
// Collect first n elements
val collectN : ZSink [ Any , Nothing , String , String , Chunk [ String ]] =
ZSink .collectAllN( 10 )
// Collect into a set
val toSet : ZSink [ Any , Nothing , Int , Nothing , Set [ Int ]] =
ZSink .collectAllToSet
// Collect into a map
val toMap : ZSink [ Any , Nothing , ( String , Int ), Nothing , Map [ String , Int ]] =
ZSink .collectAllToMap(_._1)((a, b) => a + b)
Folding Sinks
// Fold with initial state
val sum : ZSink [ Any , Nothing , Int , Nothing , Int ] =
ZSink .foldLeft( 0 )(_ + _)
// Fold until condition
val sumUntil : ZSink [ Any , Nothing , Int , Int , Int ] =
ZSink .fold( 0 )(_ < 100 )(_ + _)
// Effectful fold
val effectfulSum : ZSink [ Any , IOException , Int , Nothing , Int ] =
ZSink .foldLeftZIO( 0 ) { (acc, n) =>
Console .printLine( s "Current sum: ${ acc + n } " ).as(acc + n)
}
Aggregation Sinks
// Count elements
val count : ZSink [ Any , Nothing , Any , Nothing , Long ] =
ZSink .count
// Sum numeric elements
val sum : ZSink [ Any , Nothing , Int , Nothing , Int ] =
ZSink .sum
// Find first element matching predicate
val find : ZSink [ Any , Nothing , Int , Int , Option [ Int ]] =
ZSink .fold[ Int , Option [ Int ]]( None )(_.isEmpty) {
case ( None , n) if n > 10 => Some (n)
case (acc, _) => acc
}
Using Sinks with Streams
Running Streams to Sinks
val stream = ZStream ( 1 , 2 , 3 , 4 , 5 )
// Using run method
val result1 : ZIO [ Any , Nothing , Chunk [ Int ]] =
stream.run( ZSink .collectAll)
// Using >>> operator
val result2 : ZIO [ Any , Nothing , Int ] =
stream >>> ZSink .sum
// Multiple sinks
val (sum, count) = stream.run(
ZSink .sum <*> ZSink .count
)
Mapping Results
val collectSink = ZSink .collectAll[ Int ]
// Map the result
val countSink : ZSink [ Any , Nothing , Int , Nothing , Int ] =
collectSink.map(_.size)
// Map with effect
val logged : ZSink [ Any , IOException , Int , Nothing , Chunk [ Int ]] =
collectSink.mapZIO { chunk =>
Console .printLine( s "Collected ${ chunk.size } elements" ).as(chunk)
}
val intSum : ZSink [ Any , Nothing , Int , Nothing , Int ] =
ZSink .sum
// Transform input before summing
val stringLengthSum : ZSink [ Any , Nothing , String , Nothing , Int ] =
intSum.contramap[ String ](_.length)
// Stream("hello", "world") >>> stringLengthSum = 10
Combining Sinks
Sequential Combination
val sink1 : ZSink [ Any , Nothing , Int , Int , Int ] =
ZSink .collectAllN( 5 ).map(_.sum)
val sink2 : ZSink [ Any , Nothing , Int , Nothing , Int ] =
ZSink .sum
// Run sink1, then sink2 on leftovers
val combined = sink1.flatMap { firstSum =>
sink2.map { secondSum =>
(firstSum, secondSum)
}
}
Parallel Combination
// Run two sinks in parallel
val parallel = ZSink .collectAll <&> ZSink .count
// Produces (Chunk[A], Long)
// Zip sinks
val zipped = ZSink .sum <*> ZSink .count
// Produces (Int, Long)
Racing Sinks
// Race two sinks, take result from first to complete
val raced = ZSink .head | ZSink .last
Weighted Folding
// Fold until total weight exceeds threshold
val weightedSink = ZSink .foldWeighted( Chunk .empty[ String ])(
// Cost function: string length
(acc, s : String ) => s.length.toLong,
// Max total weight: 1000 characters
max = 1000
)(
// Combining function
(acc, s) => acc :+ s
)
// With decomposition for large elements
val decomposingSink = ZSink .foldWeightedDecompose( Chunk .empty[ String ])(
(acc, s : String ) => s.length.toLong,
max = 100 ,
// Break down large strings
decompose = s =>
if (s.length > 100 )
Chunk .fromIterable(s.grouped( 50 ))
else
Chunk .single(s)
)((acc, s) => acc :+ s)
Conditional Sinks
// Collect until predicate
val untilBlank : ZSink [ Any , Nothing , String , String , Chunk [ String ]] =
ZSink .collectAllUntil[ String ](_.isEmpty)
// Collect while predicate
val whileNonEmpty : ZSink [ Any , Nothing , String , String , Chunk [ String ]] =
ZSink .collectAllWhile[ String ](_.nonEmpty)
// With effectful predicates
val untilValid : ZSink [ Any , IOException , String , String , Chunk [ String ]] =
ZSink .collectAllUntilZIO { s =>
validate(s).map(_ == ValidationResult . Stop )
}
Splitting Sinks
// Split on predicate
val splitSink : ZSink [ Any , Nothing , Int , Int , Chunk [ Int ]] =
ZSink .collectAll[ Int ].splitWhere(_ == 0 )
// Example: ZStream(1,2,0,3,4) >>> splitSink
// First run: Chunk(1,2), leftover: Chunk(0,3,4)
Common Patterns
Batching to Database
def batchInsert [ A ]( batchSize : Int ) : ZSink [ Any , Throwable , A , Nothing , Unit ] =
ZSink .collectAllN[ A ](batchSize).mapZIO { batch =>
database.insertBatch(batch)
}.ignoreLeftover
// Usage
stream.run(batchInsert( 100 ))
Accumulate and Process
def processInBatches [ A , B ](
batchSize : Int ,
process : Chunk [ A ] => ZIO [ Any , Throwable , B ]
) : ZSink [ Any , Throwable , A , Nothing , Chunk [ B ]] = {
ZSink .foldChunksZIO( Chunk .empty[ B ])(_.size < 100 ) { (results, chunk) =>
if (chunk.size >= batchSize) {
process(chunk.take(batchSize)).map(results :+ _)
} else {
ZIO .succeed(results)
}
}
}
File Writing
import java . nio . file .{ Files , Path , StandardOpenOption }
def writeToFile ( path : Path ) : ZSink [ Any , Throwable , Byte , Nothing , Long ] =
ZSink .foldLeftZIO( 0L ) { (count, byte : Byte ) =>
ZIO .attempt {
Files .write(path, Array (byte), StandardOpenOption . APPEND )
count + 1
}
}
Statistics Collector
case class Stats ( count : Long , sum : Double , min : Double , max : Double )
def statsCollector : ZSink [ Any , Nothing , Double , Nothing , Stats ] =
ZSink .foldLeft( Stats ( 0 , 0.0 , Double . MaxValue , Double . MinValue )) {
(stats, value) =>
Stats (
count = stats.count + 1 ,
sum = stats.sum + value,
min = math.min(stats.min, value),
max = math.max(stats.max, value)
)
}
Resource Management
// Sink with resource cleanup
def writingToResource [ A ](
open : ZIO [ Scope , Throwable , Writer ],
write : ( Writer , A ) => ZIO [ Any , Throwable , Unit ]
) : ZSink [ Any , Throwable , A , Nothing , Unit ] = {
ZSink .unwrapScoped {
open.map { writer =>
ZSink .foreach[ Any , Throwable , A ](a => write(writer, a))
}
}
}
Use foldLeftChunks instead of foldLeft when you can process chunks efficiently: // More efficient
ZSink .foldLeftChunks( 0 )((sum, chunk : Chunk [ Int ]) => sum + chunk.sum)
// Less efficient
ZSink .foldLeft( 0 )(_ + _)
Common Sinks Reference
collectAll Collect all elements into a Chunk
collectAllN Collect first n elements
collectAllToSet Collect into a Set
collectAllToMap Collect into a Map
foldLeft Fold elements left-to-right
foreach Run effect for each element
See Also
ZStream Produce stream elements
ZPipeline Transform stream elements
ZChannel Low-level streaming primitive