The Streams API enables efficient processing of data that arrives over time, without loading everything into memory at once. Workerd implements both standard WHATWG streams and optimized internal byte streams.
Overview
Workerd provides two stream implementations:
Standard streams - WHATWG-compliant, support any JavaScript value
Internal streams - Optimized byte-only streams backed by kj::AsyncInputStream
Implementation: src/workerd/api/streams/
See src/workerd/api/streams/README.md for detailed implementation reference
Stream types
ReadableStream
A readable stream represents a source of data:
const stream = new ReadableStream ({
start ( controller ) {
controller . enqueue ( 'chunk 1' );
controller . enqueue ( 'chunk 2' );
controller . close ();
}
});
const reader = stream . getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
console . log ( value );
}
WritableStream
A writable stream represents a destination for data:
const stream = new WritableStream ({
write ( chunk ) {
console . log ( 'Writing:' , chunk );
},
close () {
console . log ( 'Stream closed' );
}
});
const writer = stream . getWriter ();
await writer . write ( 'chunk 1' );
await writer . write ( 'chunk 2' );
await writer . close ();
A transform stream modifies data as it flows through:
const transform = new TransformStream ({
transform ( chunk , controller ) {
// Convert to uppercase
controller . enqueue ( chunk . toString (). toUpperCase ());
}
});
const reader = transform . readable . getReader ();
const writer = transform . writable . getWriter ();
await writer . write ( 'hello' );
const { value } = await reader . read ();
console . log ( value ); // "HELLO"
Reading streams
Default reader
Read chunks sequentially:
const response = await fetch ( 'https://example.com/large-file' );
const reader = response . body . getReader ();
try {
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
// Process chunk (Uint8Array)
console . log ( `Received ${ value . length } bytes` );
}
} finally {
reader . releaseLock ();
}
BYOB reader
Bring Your Own Buffer for zero-copy reading:
const reader = stream . getReader ({ mode: 'byob' });
const buffer = new Uint8Array ( 1024 );
const { done , value } = await reader . read ( buffer );
if ( ! done ) {
console . log ( `Read ${ value . byteLength } bytes into buffer` );
}
Writing streams
Basic writing
Write data to a stream:
const stream = new WritableStream ({
async write ( chunk ) {
// Process the chunk
await processChunk ( chunk );
}
});
const writer = stream . getWriter ();
try {
await writer . write ( 'chunk 1' );
await writer . write ( 'chunk 2' );
} finally {
await writer . close ();
}
Backpressure
Writers automatically handle backpressure:
const writer = stream . getWriter ();
for ( const chunk of largeDataset ) {
// This will wait if the stream is full
await writer . write ( chunk );
}
await writer . close ();
Piping streams
Connect streams together efficiently:
// Pipe readable to writable
await readableStream . pipeTo ( writableStream );
// Pipe through a transform
const transformed = readableStream . pipeThrough ( transformStream );
// Chain multiple transforms
const result = readableStream
. pipeThrough ( compressionStream )
. pipeThrough ( encryptionStream );
Piping with error handling
try {
await sourceStream . pipeTo ( destStream );
console . log ( 'Pipe completed successfully' );
} catch ( error ) {
console . error ( 'Pipe failed:' , error );
}
CompressionStream
Compress data with various algorithms:
const compressed = readableStream . pipeThrough (
new CompressionStream ( 'gzip' )
);
// Supported formats: 'gzip', 'deflate', 'deflate-raw'
DecompressionStream
Decompress data:
const decompressed = compressedStream . pipeThrough (
new DecompressionStream ( 'gzip' )
);
TextEncoderStream
Encode text to bytes:
const textStream = new ReadableStream ({
start ( controller ) {
controller . enqueue ( 'Hello ' );
controller . enqueue ( 'World' );
controller . close ();
}
});
const byteStream = textStream . pipeThrough (
new TextEncoderStream ()
);
TextDecoderStream
Decode bytes to text:
const textStream = byteStream . pipeThrough (
new TextDecoderStream ( 'utf-8' )
);
Teeing streams
Split a stream into two independent streams:
const [ stream1 , stream2 ] = originalStream . tee ();
// Both streams receive the same data
await Promise . all ([
stream1 . pipeTo ( destination1 ),
stream2 . pipeTo ( destination2 )
]);
Workerd provides an optimized identity transform for byte streams:
const { readable , writable } = new IdentityTransformStream ();
// Efficiently passes bytes through without modification
const response = new Response ( readable );
Source: src/workerd/api/streams/identity-transform-stream.h
Advanced patterns
Custom readable source
Create a stream from a data source:
const stream = new ReadableStream ({
async start ( controller ) {
// Initialize source
},
async pull ( controller ) {
// Called when backpressure releases
const data = await fetchNextChunk ();
if ( data ) {
controller . enqueue ( data );
} else {
controller . close ();
}
},
cancel ( reason ) {
// Clean up resources
}
});
Custom writable sink
Create a stream that processes data:
const stream = new WritableStream ({
async write ( chunk , controller ) {
// Process chunk
await processData ( chunk );
},
async close () {
// Finalize
},
async abort ( reason ) {
// Handle cancellation
}
});
Create a transform stream:
const transform = new TransformStream ({
async transform ( chunk , controller ) {
// Transform the chunk
const transformed = await processChunk ( chunk );
controller . enqueue ( transformed );
},
async flush ( controller ) {
// Emit any remaining data
const final = await finalizeTransform ();
if ( final ) {
controller . enqueue ( final );
}
}
});
Byte streams
Create byte-oriented readable streams:
const stream = new ReadableStream ({
type: 'bytes' ,
async pull ( controller ) {
const buffer = new Uint8Array ( 1024 );
const bytesRead = await readInto ( buffer );
if ( bytesRead === 0 ) {
controller . close ();
} else {
controller . enqueue ( buffer . subarray ( 0 , bytesRead ));
}
}
});
Stream classification
Workerd’s streams implementation uses two distinct architectures:
Feature Internal Standard Spec compliance Non-standard (kj-backed) WHATWG Streams Data types Byte-only Byte or value Queue model Single pending read Dual queues Async model kj::Promise JS Promise Isolate lock Data flows outside lock Data flows inside lock Backpressure Implicit (kj flow control) Explicit (highwater mark)
Source: src/workerd/api/streams/README.md:8
Best practices
Always release stream locks when done: const reader = stream . getReader ();
try {
// Read data
} finally {
reader . releaseLock ();
}
Streams can error at any time: try {
await stream . pipeTo ( destination );
} catch ( error ) {
console . error ( 'Stream error:' , error );
// Clean up resources
}
Use piping for efficiency
Prefer pipeTo() and pipeThrough() over manual reading and writing: // Good: efficient piping
await source . pipeTo ( destination );
// Avoid: manual copying
const reader = source . getReader ();
const writer = destination . getWriter ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
await writer . write ( value );
}
Implementation details
The Streams API is implemented across multiple files:
src/workerd/api/streams/readable.h / .c++ - ReadableStream (1000+ lines)
src/workerd/api/streams/writable.h / .c++ - WritableStream (800+ lines)
src/workerd/api/streams/transform.h / .c++ - TransformStream
src/workerd/api/streams/standard.h / .c++ - Standard (WHATWG) implementation (6000+ lines)
src/workerd/api/streams/internal.h / .c++ - Internal (kj-backed) implementation (3000+ lines)
src/workerd/api/streams/queue.h / .c++ - Queue mechanics for backpressure
src/workerd/api/streams/compression.h / .c++ - Compression transforms