Skip to main content
Stability: 2 - Stable (No longer experimental since v21.0.0)
An implementation of the WHATWG Streams Standard for handling streaming data across JavaScript environments.

Overview

The WHATWG Streams Standard (or “web streams”) defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the “standard” API for streaming data across many JavaScript environments. There are three primary types of objects:
  • ReadableStream - Represents a source of streaming data
  • WritableStream - Represents a destination for streaming data
  • TransformStream - Represents an algorithm for transforming streaming data

Example: ReadableStream

This example creates a simple ReadableStream that pushes the current performance.now() timestamp once every second:
import { ReadableStream } from 'node:stream/web';
import { setInterval as every } from 'node:timers/promises';
import { performance } from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

for await (const value of stream)
  console.log(value);

Node.js Streams Interoperability

Node.js streams can be converted to web streams and vice versa via the toWeb and fromWeb methods:
  • stream.Readable.toWeb() - Convert Node.js Readable to ReadableStream
  • stream.Readable.fromWeb() - Convert ReadableStream to Node.js Readable
  • stream.Writable.toWeb() - Convert Node.js Writable to WritableStream
  • stream.Writable.fromWeb() - Convert WritableStream to Node.js Writable
  • stream.Duplex.toWeb() - Convert Node.js Duplex to web streams
  • stream.Duplex.fromWeb() - Convert web streams to Node.js Duplex

ReadableStream

new ReadableStream([underlyingSource [, strategy]])

Creates a new ReadableStream instance. Parameters:
  • underlyingSource (Object) - Optional configuration
    • start (Function) - Invoked immediately when the ReadableStream is created
    • pull (Function) - Called repeatedly when the internal queue is not full
    • cancel (Function) - Called when the ReadableStream is canceled
    • type (string) - Must be 'bytes' or undefined
    • autoAllocateChunkSize (number) - Used when type is 'bytes'
  • strategy (Object) - Optional queuing strategy
    • highWaterMark (number) - Maximum internal queue size
    • size (Function) - Function to identify chunk size

Properties

readableStream.locked

Set to true if there is an active reader for this ReadableStream. Type: boolean

Methods

readableStream.cancel([reason])

Cancels the stream. Parameters:
  • reason (any) - Optional cancellation reason
Returns: Promise fulfilled with undefined

readableStream.getReader([options])

Creates and returns a reader for consuming the stream’s data. Parameters:
  • options (Object)
    • mode (string) - 'byob' or undefined
Returns: ReadableStreamDefaultReader or ReadableStreamBYOBReader
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();
const reader = stream.getReader();

console.log(await reader.read());

readableStream.pipeThrough(transform[, options])

Connects this ReadableStream to a transform stream, returning the readable side. Parameters:
  • transform (Object)
    • readable (ReadableStream)
    • writable (WritableStream)
  • options (Object)
    • preventAbort (boolean)
    • preventCancel (boolean)
    • preventClose (boolean)
    • signal (AbortSignal)
Returns: ReadableStream from transform.readable
import { ReadableStream, TransformStream } from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk); // Prints: A

readableStream.pipeTo(destination[, options])

Pipes this ReadableStream to a WritableStream. Parameters:
  • destination (WritableStream)
  • options (Object)
    • preventAbort (boolean)
    • preventCancel (boolean)
    • preventClose (boolean)
    • signal (AbortSignal)
Returns: Promise fulfilled with undefined

readableStream.tee()

Returns a pair of new ReadableStream instances to which this ReadableStream’s data will be forwarded. Returns: Array of two ReadableStream instances

readableStream.values([options])

Creates and returns an async iterator for consuming the stream’s data. Parameters:
  • options (Object)
    • preventCancel (boolean) - Prevents closing the stream when iterator terminates
Returns: AsyncIterator

Static Methods

ReadableStream.from(iterable)

Creates a new ReadableStream from an iterable. Parameters:
  • iterable (Iterable) - Object implementing Symbol.asyncIterator or Symbol.iterator
Returns: ReadableStream
import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
  console.log(chunk); // Prints: 'a', 'b', 'c'

WritableStream

The WritableStream is a destination to which stream data is sent.

new WritableStream([underlyingSink[, strategy]])

Creates a new WritableStream instance. Parameters:
  • underlyingSink (Object)
    • start (Function) - Invoked when WritableStream is created
    • write (Function) - Invoked when a chunk is written
    • close (Function) - Called when the stream is closed
    • abort (Function) - Called to abruptly close the stream
  • strategy (Object)
    • highWaterMark (number) - Maximum internal queue size
    • size (Function) - Function to identify chunk size
import { WritableStream } from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await stream.getWriter().write('Hello World');

Properties

writableStream.locked

Set to true while there is an active writer attached to this WritableStream. Type: boolean

Methods

writableStream.abort([reason])

Abruptly terminates the WritableStream. All queued writes will be canceled. Parameters:
  • reason (any)
Returns: Promise fulfilled with undefined

writableStream.close()

Closes the WritableStream when no additional writes are expected. Returns: Promise fulfilled with undefined

writableStream.getWriter()

Creates and returns a new writer instance. Returns: WritableStreamDefaultWriter

TransformStream

A TransformStream consists of a ReadableStream and a WritableStream that are connected such that data written to the WritableStream is received, and potentially transformed, before being pushed to the ReadableStream.

new TransformStream([transformer[, writableStrategy[, readableStrategy]]])

Creates a new TransformStream instance. Parameters:
  • transformer (Object)
    • start (Function) - Invoked when TransformStream is created
    • transform (Function) - Receives and modifies chunks
    • flush (Function) - Called before writable side closes
  • writableStrategy (Object) - Queuing strategy for writable side
  • readableStrategy (Object) - Queuing strategy for readable side
import { TransformStream } from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]);

Properties

transformStream.readable

The readable side of the transform stream. Type: ReadableStream

transformStream.writable

The writable side of the transform stream. Type: WritableStream

Readers

ReadableStreamDefaultReader

The default reader treats chunks of data as opaque values.

new ReadableStreamDefaultReader(stream)

Creates a new reader locked to the given ReadableStream. Parameters:
  • stream (ReadableStream)

Methods

readableStreamDefaultReader.read()
Requests the next chunk of data. Returns: Promise fulfilled with {value, done}
readableStreamDefaultReader.cancel([reason])
Cancels the ReadableStream. Returns: Promise fulfilled with undefined
readableStreamDefaultReader.releaseLock()
Releases the reader’s lock on the ReadableStream.

ReadableStreamBYOBReader

An alternative consumer for byte-oriented ReadableStreams using the “bring your own buffer” pattern for more efficient reading.

new ReadableStreamBYOBReader(stream)

Creates a new BYOB reader locked to the given ReadableStream. Parameters:
  • stream (ReadableStream) - Must be created with type: 'bytes'

readableStreamBYOBReader.read(view[, options])

Reads data into the provided buffer. Parameters:
  • view (Buffer|TypedArray|DataView) - The buffer to read into
  • options (Object)
    • min (number) - Minimum number of elements required
Returns: Promise fulfilled with {value, done}
Do not pass a pooled Buffer object instance to this method. Pooled buffers use a shared underlying ArrayBuffer that will be detached, invalidating all existing views.

Controllers

ReadableStreamDefaultController

Controls a non-byte-oriented ReadableStream.

Methods

readableStreamDefaultController.close()
Closes the ReadableStream.
readableStreamDefaultController.enqueue([chunk])
Appends a new chunk of data to the queue. Parameters:
  • chunk (any)
readableStreamDefaultController.error([error])
Signals an error that causes the stream to error and close. Parameters:
  • error (any)

Properties

readableStreamDefaultController.desiredSize
Returns the amount of data remaining to fill the queue. Type: number

ReadableByteStreamController

Controls a byte-oriented ReadableStream.

Properties

readableByteStreamController.byobRequest
Provides access to the current BYOB read request. Type: ReadableStreamBYOBRequest
readableByteStreamController.desiredSize
Amount of data remaining to fill the queue. Type: number

Methods

readableByteStreamController.close()
Closes the ReadableStream.
readableByteStreamController.enqueue(chunk)
Appends a new chunk to the queue. Parameters:
  • chunk (Buffer|TypedArray|DataView)
readableByteStreamController.error([error])
Signals an error. Parameters:
  • error (any)

WritableStreamDefaultController

Manages the WritableStream’s internal state.

Methods

writableStreamDefaultController.error([error])
Signals an error to abort the stream. Parameters:
  • error (any)

Properties

writableStreamDefaultController.signal
An AbortSignal that can be used to cancel pending operations. Type: AbortSignal

TransformStreamDefaultController

Manages the internal state of the TransformStream.

Properties

transformStreamDefaultController.desiredSize
Amount of data required to fill the readable side’s queue. Type: number

Methods

transformStreamDefaultController.enqueue([chunk])
Appends a chunk to the readable side’s queue. Parameters:
  • chunk (any)
transformStreamDefaultController.error([reason])
Signals an error to both sides. Parameters:
  • reason (any)
transformStreamDefaultController.terminate()
Closes the readable side and abruptly closes the writable side.

Queuing Strategies

ByteLengthQueuingStrategy

Provides a built-in byte length queuing strategy.

new ByteLengthQueuingStrategy(init)

Parameters:
  • init (Object)
    • highWaterMark (number)

CountQueuingStrategy

Provides a built-in chunk counting queuing strategy.

new CountQueuingStrategy(init)

Parameters:
  • init (Object)
    • highWaterMark (number)

Encoding Streams

TextEncoderStream

A TransformStream that encodes strings into UTF-8 bytes.

new TextEncoderStream()

Creates a new TextEncoderStream instance.

Properties

textEncoderStream.encoding
The encoding supported (always 'utf-8'). Type: string
textEncoderStream.readable
Type: ReadableStream
textEncoderStream.writable
Type: WritableStream

TextDecoderStream

A TransformStream that decodes bytes into strings.

new TextDecoderStream([encoding[, options]])

Parameters:
  • encoding (string) - Default: 'utf-8'
  • options (Object)
    • fatal (boolean) - If decoding failures are fatal
    • ignoreBOM (boolean) - Whether to include byte order mark

Properties

textDecoderStream.encoding
The encoding being used. Type: string
textDecoderStream.fatal
Type: boolean
textDecoderStream.ignoreBOM
Type: boolean
textDecoderStream.readable
Type: ReadableStream
textDecoderStream.writable
Type: WritableStream

Compression Streams

CompressionStream

A TransformStream that compresses data.

new CompressionStream(format)

Parameters:
  • format (string) - One of 'deflate', 'deflate-raw', 'gzip', or 'brotli'

Properties

compressionStream.readable
Type: ReadableStream
compressionStream.writable
Type: WritableStream

DecompressionStream

A TransformStream that decompresses data.

new DecompressionStream(format)

Parameters:
  • format (string) - One of 'deflate', 'deflate-raw', 'gzip', or 'brotli'

Properties

decompressionStream.readable
Type: ReadableStream
decompressionStream.writable
Type: WritableStream

Utility Consumers

Utility functions for consuming streams, available from node:stream/consumers.
import { arrayBuffer, blob, buffer, json, text } from 'node:stream/consumers';

streamConsumers.arrayBuffer(stream)

Consumes the entire stream into an ArrayBuffer. Parameters:
  • stream (ReadableStream|stream.Readable|AsyncIterator)
Returns: Promise fulfilled with ArrayBuffer

streamConsumers.blob(stream)

Consumes the entire stream into a Blob. Parameters:
  • stream (ReadableStream|stream.Readable|AsyncIterator)
Returns: Promise fulfilled with Blob

streamConsumers.buffer(stream)

Consumes the entire stream into a Buffer. Parameters:
  • stream (ReadableStream|stream.Readable|AsyncIterator)
Returns: Promise fulfilled with Buffer

streamConsumers.bytes(stream)

Consumes the entire stream into a Uint8Array. Parameters:
  • stream (ReadableStream|stream.Readable|AsyncIterator)
Returns: Promise fulfilled with Uint8Array

streamConsumers.json(stream)

Consumes the stream and parses it as JSON. Parameters:
  • stream (ReadableStream|stream.Readable|AsyncIterator)
Returns: Promise fulfilled with parsed JSON
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = [{ message: 'hello' }];
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(data.length); // 1

streamConsumers.text(stream)

Consumes the stream as UTF-8 encoded text. Parameters:
  • stream (ReadableStream|stream.Readable|AsyncIterator)
Returns: Promise fulfilled with string
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world!');
const data = await text(readable);
console.log(data); // 'Hello world!'