Skip to main content
Bun provides full support for the Web Streams API, including ReadableStream, WritableStream, and TransformStream.

ReadableStream

ReadableStreams represent a source of data that can be read chunk by chunk:

Creating Readable Streams

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("Hello ");
    controller.enqueue("World");
    controller.close();
  },
});

With Pull

let i = 0;
const stream = new ReadableStream({
  pull(controller) {
    if (i < 10) {
      controller.enqueue(`Chunk ${i++}`);
    } else {
      controller.close();
    }
  },
});

With Cancel

const stream = new ReadableStream({
  start(controller) {
    const interval = setInterval(() => {
      controller.enqueue(Date.now());
    }, 1000);
    
    // Store cleanup function
    controller.interval = interval;
  },
  cancel(reason) {
    clearInterval(this.interval);
    console.log("Stream cancelled:", reason);
  },
});

Reading from Streams

Using getReader()

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("Hello");
    controller.enqueue("World");
    controller.close();
  },
});

const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value);
}

reader.releaseLock();

Using for-await-of

for await (const chunk of stream) {
  console.log(chunk);
}

Using pipeTo()

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue("data");
    controller.close();
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await readable.pipeTo(writable);

WritableStream

WritableStreams represent a destination for data:

Creating Writable Streams

const stream = new WritableStream({
  write(chunk) {
    console.log("Writing:", chunk);
  },
  close() {
    console.log("Stream closed");
  },
  abort(reason) {
    console.error("Stream aborted:", reason);
  },
});

Writing to Streams

const writer = stream.getWriter();

await writer.write("Hello");
await writer.write("World");
await writer.close();

Using with Files

const file = Bun.file("./output.txt");
const writer = file.writer();

writer.write("Line 1\n");
writer.write("Line 2\n");
await writer.end();

TransformStream

TransformStreams convert data as it flows through:

Creating Transform Streams

const uppercase = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toString().toUpperCase());
  },
});

// Pipe through transformer
readable
  .pipeThrough(uppercase)
  .pipeTo(writable);

Multiple Transforms

const uppercase = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toString().toUpperCase());
  },
});

const addPrefix = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(`[PREFIX] ${chunk}`);
  },
});

readable
  .pipeThrough(uppercase)
  .pipeThrough(addPrefix)
  .pipeTo(writable);

Compression Streams

CompressionStream

const input = new ReadableStream({
  start(controller) {
    controller.enqueue("Some data to compress");
    controller.close();
  },
});

const compressed = input.pipeThrough(
  new CompressionStream("gzip")
);

// Write compressed data
const file = Bun.file("./output.gz");
await compressed.pipeTo(file.writer());

DecompressionStream

const file = Bun.file("./data.gz");
const decompressed = file.stream().pipeThrough(
  new DecompressionStream("gzip")
);

for await (const chunk of decompressed) {
  console.log(new TextDecoder().decode(chunk));
}

Supported Formats

  • "gzip" - GZIP compression
  • "deflate" - DEFLATE compression
  • "deflate-raw" - Raw DEFLATE (no headers)

Text Encoding/Decoding Streams

TextEncoderStream

const encoder = new TextEncoderStream();

const textStream = new ReadableStream({
  start(controller) {
    controller.enqueue("Hello");
    controller.enqueue("World");
    controller.close();
  },
});

const bytes = textStream.pipeThrough(encoder);
// Now contains Uint8Array chunks

TextDecoderStream

const decoder = new TextDecoderStream("utf-8");

const byteStream = Bun.file("./file.txt").stream();
const textStream = byteStream.pipeThrough(decoder);

for await (const text of textStream) {
  console.log(text);
}

File Streams

Reading Files

const file = Bun.file("./large.txt");
const stream = file.stream();

for await (const chunk of stream) {
  console.log(chunk); // Uint8Array
}

Writing Files

const file = Bun.file("./output.txt");
const writer = file.writer();

writer.write("Line 1\n");
writer.write("Line 2\n");
await writer.end();

Piping Files

const input = Bun.file("./input.txt");
const output = Bun.file("./output.txt");

await input.stream().pipeTo(output.writer());

HTTP Response Streams

Streaming Response

Bun.serve({
  fetch(req) {
    const stream = new ReadableStream({
      async start(controller) {
        for (let i = 0; i < 10; i++) {
          controller.enqueue(`Chunk ${i}\n`);
          await Bun.sleep(100);
        }
        controller.close();
      },
    });
    
    return new Response(stream);
  },
});

Server-Sent Events

Bun.serve({
  fetch(req) {
    const stream = new ReadableStream({
      async start(controller) {
        const encoder = new TextEncoder();
        
        for (let i = 0; i < 10; i++) {
          const message = `data: ${JSON.stringify({ count: i })}\n\n`;
          controller.enqueue(encoder.encode(message));
          await Bun.sleep(1000);
        }
        
        controller.close();
      },
    });
    
    return new Response(stream, {
      headers: {
        "Content-Type": "text/event-stream",
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
      },
    });
  },
});

Streaming File Upload

const file = Bun.file("./large-file.dat");

const response = await fetch("https://example.com/upload", {
  method: "POST",
  body: file.stream(),
  headers: {
    "Content-Type": "application/octet-stream",
  },
});

Stream Utilities

Teeing Streams

const [stream1, stream2] = readable.tee();

// Now you can read from both
stream1.pipeTo(destination1);
stream2.pipeTo(destination2);

Canceling Streams

const reader = stream.getReader();

// Cancel after 5 seconds
setTimeout(() => {
  reader.cancel("Timeout");
}, 5000);

try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log(value);
  }
} catch (err) {
  console.error("Stream cancelled", err);
}

Backpressure Handling

const stream = new ReadableStream({
  async pull(controller) {
    // Check desired size
    if (controller.desiredSize <= 0) {
      // Wait before enqueuing more
      await Bun.sleep(100);
    }
    
    controller.enqueue(data);
  },
});

Direct Streams (Bun-specific)

Bun supports direct streams for zero-copy I/O:
const stream = new ReadableStream({
  type: "direct",
  async pull(controller) {
    // Get a buffer to write into
    const buffer = controller.write(1024);
    
    // Fill the buffer
    const bytesWritten = await readInto(buffer);
    
    if (bytesWritten === 0) {
      controller.close();
    }
  },
});

Stream from Async Iterator

async function* generate() {
  for (let i = 0; i < 10; i++) {
    await Bun.sleep(100);
    yield `Chunk ${i}`;
  }
}

const stream = ReadableStream.from(generate());

Error Handling

const stream = new ReadableStream({
  start(controller) {
    try {
      // Risky operation
      controller.enqueue(data);
    } catch (err) {
      controller.error(err);
    }
  },
});

try {
  for await (const chunk of stream) {
    console.log(chunk);
  }
} catch (err) {
  console.error("Stream error:", err);
}

Type Signatures

interface ReadableStream<R = any> {
  readonly locked: boolean;
  
  cancel(reason?: any): Promise<void>;
  getReader(): ReadableStreamDefaultReader<R>;
  pipeThrough<T>(transform: TransformStream<R, T>): ReadableStream<T>;
  pipeTo(destination: WritableStream<R>): Promise<void>;
  tee(): [ReadableStream<R>, ReadableStream<R>];
  
  [Symbol.asyncIterator](): AsyncIterableIterator<R>;
}

interface WritableStream<W = any> {
  readonly locked: boolean;
  
  abort(reason?: any): Promise<void>;
  close(): Promise<void>;
  getWriter(): WritableStreamDefaultWriter<W>;
}

interface TransformStream<I = any, O = any> {
  readonly readable: ReadableStream<O>;
  readonly writable: WritableStream<I>;
}

interface ReadableStreamDefaultController<R> {
  readonly desiredSize: number | null;
  
  close(): void;
  enqueue(chunk: R): void;
  error(error: any): void;
}

interface WritableStreamDefaultController {
  error(error: any): void;
}

interface TransformStreamDefaultController<O> {
  readonly desiredSize: number | null;
  
  enqueue(chunk: O): void;
  error(reason: any): void;
  terminate(): void;
}

Build docs developers (and LLMs) love