Skip to main content
The Streams API enables efficient processing of data that arrives over time, without loading everything into memory at once. Workerd implements both standard WHATWG streams and optimized internal byte streams.

Overview

Workerd provides two stream implementations:
  • Standard streams - WHATWG-compliant, support any JavaScript value
  • Internal streams - Optimized byte-only streams backed by kj::AsyncInputStream
Implementation: src/workerd/api/streams/
See src/workerd/api/streams/README.md for detailed implementation reference

Stream types

ReadableStream

A readable stream represents a source of data:
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('chunk 1');
    controller.enqueue('chunk 2');
    controller.close();
  }
});

const reader = stream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value);
}

WritableStream

A writable stream represents a destination for data:
const stream = new WritableStream({
  write(chunk) {
    console.log('Writing:', chunk);
  },
  close() {
    console.log('Stream closed');
  }
});

const writer = stream.getWriter();
await writer.write('chunk 1');
await writer.write('chunk 2');
await writer.close();

TransformStream

A transform stream modifies data as it flows through:
const transform = new TransformStream({
  transform(chunk, controller) {
    // Convert to uppercase
    controller.enqueue(chunk.toString().toUpperCase());
  }
});

const reader = transform.readable.getReader();
const writer = transform.writable.getWriter();

await writer.write('hello');
const { value } = await reader.read();
console.log(value); // "HELLO"

Reading streams

Default reader

Read chunks sequentially:
const response = await fetch('https://example.com/large-file');
const reader = response.body.getReader();

try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    // Process chunk (Uint8Array)
    console.log(`Received ${value.length} bytes`);
  }
} finally {
  reader.releaseLock();
}

BYOB reader

Bring Your Own Buffer for zero-copy reading:
const reader = stream.getReader({ mode: 'byob' });
const buffer = new Uint8Array(1024);

const { done, value } = await reader.read(buffer);
if (!done) {
  console.log(`Read ${value.byteLength} bytes into buffer`);
}

Writing streams

Basic writing

Write data to a stream:
const stream = new WritableStream({
  async write(chunk) {
    // Process the chunk
    await processChunk(chunk);
  }
});

const writer = stream.getWriter();

try {
  await writer.write('chunk 1');
  await writer.write('chunk 2');
} finally {
  await writer.close();
}

Backpressure

Writers automatically handle backpressure:
const writer = stream.getWriter();

for (const chunk of largeDataset) {
  // This will wait if the stream is full
  await writer.write(chunk);
}

await writer.close();

Piping streams

Connect streams together efficiently:
// Pipe readable to writable
await readableStream.pipeTo(writableStream);

// Pipe through a transform
const transformed = readableStream.pipeThrough(transformStream);

// Chain multiple transforms
const result = readableStream
  .pipeThrough(compressionStream)
  .pipeThrough(encryptionStream);

Piping with error handling

try {
  await sourceStream.pipeTo(destStream);
  console.log('Pipe completed successfully');
} catch (error) {
  console.error('Pipe failed:', error);
}

Built-in transforms

CompressionStream

Compress data with various algorithms:
const compressed = readableStream.pipeThrough(
  new CompressionStream('gzip')
);

// Supported formats: 'gzip', 'deflate', 'deflate-raw'

DecompressionStream

Decompress data:
const decompressed = compressedStream.pipeThrough(
  new DecompressionStream('gzip')
);

TextEncoderStream

Encode text to bytes:
const textStream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello ');
    controller.enqueue('World');
    controller.close();
  }
});

const byteStream = textStream.pipeThrough(
  new TextEncoderStream()
);

TextDecoderStream

Decode bytes to text:
const textStream = byteStream.pipeThrough(
  new TextDecoderStream('utf-8')
);

Teeing streams

Split a stream into two independent streams:
const [stream1, stream2] = originalStream.tee();

// Both streams receive the same data
await Promise.all([
  stream1.pipeTo(destination1),
  stream2.pipeTo(destination2)
]);

Identity transform

Workerd provides an optimized identity transform for byte streams:
const { readable, writable } = new IdentityTransformStream();

// Efficiently passes bytes through without modification
const response = new Response(readable);
Source: src/workerd/api/streams/identity-transform-stream.h

Advanced patterns

Custom readable source

Create a stream from a data source:
const stream = new ReadableStream({
  async start(controller) {
    // Initialize source
  },
  async pull(controller) {
    // Called when backpressure releases
    const data = await fetchNextChunk();
    if (data) {
      controller.enqueue(data);
    } else {
      controller.close();
    }
  },
  cancel(reason) {
    // Clean up resources
  }
});

Custom writable sink

Create a stream that processes data:
const stream = new WritableStream({
  async write(chunk, controller) {
    // Process chunk
    await processData(chunk);
  },
  async close() {
    // Finalize
  },
  async abort(reason) {
    // Handle cancellation
  }
});

Custom transform

Create a transform stream:
const transform = new TransformStream({
  async transform(chunk, controller) {
    // Transform the chunk
    const transformed = await processChunk(chunk);
    controller.enqueue(transformed);
  },
  async flush(controller) {
    // Emit any remaining data
    const final = await finalizeTransform();
    if (final) {
      controller.enqueue(final);
    }
  }
});

Byte streams

Create byte-oriented readable streams:
const stream = new ReadableStream({
  type: 'bytes',
  async pull(controller) {
    const buffer = new Uint8Array(1024);
    const bytesRead = await readInto(buffer);
    
    if (bytesRead === 0) {
      controller.close();
    } else {
      controller.enqueue(buffer.subarray(0, bytesRead));
    }
  }
});

Stream classification

Workerd’s streams implementation uses two distinct architectures:
FeatureInternalStandard
Spec complianceNon-standard (kj-backed)WHATWG Streams
Data typesByte-onlyByte or value
Queue modelSingle pending readDual queues
Async modelkj::PromiseJS Promise
Isolate lockData flows outside lockData flows inside lock
BackpressureImplicit (kj flow control)Explicit (highwater mark)
Source: src/workerd/api/streams/README.md:8

Best practices

Always release stream locks when done:
const reader = stream.getReader();
try {
  // Read data
} finally {
  reader.releaseLock();
}
Streams can error at any time:
try {
  await stream.pipeTo(destination);
} catch (error) {
  console.error('Stream error:', error);
  // Clean up resources
}
Prefer pipeTo() and pipeThrough() over manual reading and writing:
// Good: efficient piping
await source.pipeTo(destination);

// Avoid: manual copying
const reader = source.getReader();
const writer = destination.getWriter();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  await writer.write(value);
}

Implementation details

The Streams API is implemented across multiple files:
  • src/workerd/api/streams/readable.h / .c++ - ReadableStream (1000+ lines)
  • src/workerd/api/streams/writable.h / .c++ - WritableStream (800+ lines)
  • src/workerd/api/streams/transform.h / .c++ - TransformStream
  • src/workerd/api/streams/standard.h / .c++ - Standard (WHATWG) implementation (6000+ lines)
  • src/workerd/api/streams/internal.h / .c++ - Internal (kj-backed) implementation (3000+ lines)
  • src/workerd/api/streams/queue.h / .c++ - Queue mechanics for backpressure
  • src/workerd/api/streams/compression.h / .c++ - Compression transforms

Build docs developers (and LLMs) love