Skip to main content

Stream

Stability: 2 - Stable
A stream is an abstract interface for working with streaming data in Node.js. The node:stream module provides an API for implementing the stream interface. There are many stream objects provided by Node.js. For instance, a request to an HTTP server and process.stdout are both stream instances. Streams can be readable, writable, or both. All streams are instances of EventEmitter.
const stream = require('node:stream');

Types of Streams

There are four fundamental stream types within Node.js:
  • Writable: streams to which data can be written (for example, fs.createWriteStream())
  • Readable: streams from which data can be read (for example, fs.createReadStream())
  • Duplex: streams that are both Readable and Writable (for example, net.Socket)
  • Transform: Duplex streams that can modify or transform the data as it is written and read (for example, zlib.createDeflate())

Object Mode

All streams created by Node.js APIs operate exclusively on strings, Buffers, TypedArray and DataView objects. It is possible, however, for stream implementations to work with other types of JavaScript values (with the exception of null). Such streams are considered to operate in “object mode”. Stream instances are switched into object mode using the objectMode option when the stream is created.

Buffering

Both Writable and Readable streams will store data in an internal buffer. The amount of data potentially buffered depends on the highWaterMark option passed into the stream’s constructor. For normal streams, the highWaterMark option specifies a total number of bytes. For streams operating in object mode, the highWaterMark specifies a total number of objects.

API for Stream Consumers

Almost all Node.js applications, no matter how simple, use streams in some manner. The following is an example of using streams in a Node.js application that implements an HTTP server:
const http = require('node:http');

const server = http.createServer((req, res) => {
  // req is an http.IncomingMessage, which is a readable stream
  // res is an http.ServerResponse, which is a writable stream

  let body = '';
  // Get the data as utf8 strings
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added
  req.on('data', (chunk) => {
    body += chunk;
  });

  // The 'end' event indicates that the entire body has been received
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // Write back something interesting to the user
      res.write(typeof data);
      res.end();
    } catch (er) {
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

Writable Streams

Writable streams are an abstraction for a destination to which data is written. Examples of Writable streams include:
  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

writable.write(chunk[, encoding][, callback])

Added in: v0.9.4
chunk
string | Buffer | Uint8Array | any
required
The data to write
encoding
string
default:"'utf8'"
The encoding, if chunk is a string
callback
Function
Callback for when this chunk of data is flushed
return
boolean
Returns false if the stream wishes for the calling code to wait for the ‘drain’ event to be emitted before continuing to write additional data; otherwise true.
The writable.write() method writes some data to the stream, and calls the supplied callback once the data has been fully handled.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.write('world!');
file.end();

writable.end([chunk[, encoding]][, callback])

Added in: v0.9.4
chunk
string | Buffer | Uint8Array | any
Optional data to write
encoding
string
The encoding if chunk is a string
callback
Function
Callback for when the stream is finished
Calling the writable.end() method signals that no more data will be written to the Writable. The optional chunk and encoding arguments allow one final additional chunk of data to be written immediately before closing the stream.
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');

Event: ‘drain’

If a call to writable.write(chunk) returns false, the ‘drain’ event will be emitted when it is appropriate to resume writing data to the stream.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        writer.write(data, encoding, callback);
      } else {
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      writer.once('drain', write);
    }
  }
}

Event: ‘finish’

The ‘finish’ event is emitted after the writable.end() method has been called, and all data has been flushed to the underlying system.
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
  console.log('All writes are now complete.');
});
writer.end('This is the end\n');

Event: ‘error’

error
Error
required
The error object
The ‘error’ event is emitted if an error occurred while writing or piping data.

Readable Streams

Readable streams are an abstraction for a source from which data is consumed. Examples of Readable streams include:
  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdout and stderr
  • process.stdin

readable.read([size])

Added in: v0.9.4
size
number
Optional argument to specify how much data to read
return
string | Buffer | null | any
Returns data from the internal buffer. If no data is available to be read, null is returned.
The readable.read() method reads data out of the internal buffer and returns it. If no data is available to be read, null is returned. By default, the data is returned as a Buffer object unless an encoding has been specified using the readable.setEncoding() method.
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
  let chunk;
  while (null !== (chunk = this.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

readable.pipe(destination[, options])

Added in: v0.9.4
destination
stream.Writable
required
The destination for writing data
options
Object
The readable.pipe() method attaches a Writable stream to the readable, causing it to switch automatically into flowing mode and push all of its data to the attached Writable. The flow of data will be automatically managed so that the destination Writable stream is not overwhelmed by a faster Readable stream.
const fs = require('node:fs');
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');

readable.pipe(writable);

readable.unpipe([destination])

Added in: v0.9.4
destination
stream.Writable
Optional specific stream to unpipe
The readable.unpipe() method detaches a Writable stream previously attached using the readable.pipe() method.

Event: ‘data’

chunk
Buffer | string | any
required
The chunk of data
The ‘data’ event is emitted whenever the stream is relinquishing ownership of a chunk of data to a consumer.
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

Event: ‘end’

The ‘end’ event is emitted when there is no more data to be consumed from the stream.
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});

Event: ‘error’

error
Error
required
The error object
The ‘error’ event may be emitted by a Readable implementation at any time.

readable.setEncoding(encoding)

Added in: v0.9.4
encoding
string
required
The encoding to use
The readable.setEncoding() method sets the character encoding for data read from the Readable stream.
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('Got %d characters of string data:', chunk.length);
});

Duplex and Transform Streams

Class: stream.Duplex

Duplex streams are streams that implement both the Readable and Writable interfaces. Examples of Duplex streams include:
  • TCP sockets
  • zlib streams
  • crypto streams

Class: stream.Transform

Transform streams are Duplex streams where the output is in some way related to the input. Like all Duplex streams, Transform streams implement both the Readable and Writable interfaces. Examples of Transform streams include:
  • zlib streams
  • crypto streams

stream.pipeline(…streams[, callback])

Added in: v10.0.0
...streams
Stream[] | Iterable[] | AsyncIterable[] | Function[]
required
Two or more streams to pipe between
callback
Function
Called when the pipeline is complete
A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

stream.finished(stream[, options], callback)

Added in: v10.0.0
stream
Stream
required
A readable and/or writable stream
options
Object
callback
Function
required
A callback function that takes an optional error argument
A function to get notified when a stream is no longer readable, writable or has experienced an error or a premature close event.
const { finished } = require('node:stream');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed:', err);
  } else {
    console.log('Stream is done reading.');
  }
});

rs.resume(); // Drain the stream

Stream Promises API

The stream/promises API provides an alternative set of asynchronous utility functions for streams that return Promise objects rather than using callbacks.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('archive.tar'),
  createGzip(),
  createWriteStream('archive.tar.gz')
);
console.log('Pipeline succeeded.');