ReadableStream
ReadableStreams represent a source of data that can be read chunk by chunk:Creating Readable Streams
const stream = new ReadableStream({
start(controller) {
controller.enqueue("Hello ");
controller.enqueue("World");
controller.close();
},
});
With Pull
let i = 0;
const stream = new ReadableStream({
pull(controller) {
if (i < 10) {
controller.enqueue(`Chunk ${i++}`);
} else {
controller.close();
}
},
});
With Cancel
const stream = new ReadableStream({
start(controller) {
const interval = setInterval(() => {
controller.enqueue(Date.now());
}, 1000);
// Store cleanup function
controller.interval = interval;
},
cancel(reason) {
clearInterval(this.interval);
console.log("Stream cancelled:", reason);
},
});
Reading from Streams
Using getReader()
const stream = new ReadableStream({
start(controller) {
controller.enqueue("Hello");
controller.enqueue("World");
controller.close();
},
});
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
reader.releaseLock();
Using for-await-of
for await (const chunk of stream) {
console.log(chunk);
}
Using pipeTo()
const readable = new ReadableStream({
start(controller) {
controller.enqueue("data");
controller.close();
},
});
const writable = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await readable.pipeTo(writable);
WritableStream
WritableStreams represent a destination for data:Creating Writable Streams
const stream = new WritableStream({
write(chunk) {
console.log("Writing:", chunk);
},
close() {
console.log("Stream closed");
},
abort(reason) {
console.error("Stream aborted:", reason);
},
});
Writing to Streams
const writer = stream.getWriter();
await writer.write("Hello");
await writer.write("World");
await writer.close();
Using with Files
const file = Bun.file("./output.txt");
const writer = file.writer();
writer.write("Line 1\n");
writer.write("Line 2\n");
await writer.end();
TransformStream
TransformStreams convert data as it flows through:Creating Transform Streams
const uppercase = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString().toUpperCase());
},
});
// Pipe through transformer
readable
.pipeThrough(uppercase)
.pipeTo(writable);
Multiple Transforms
const uppercase = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString().toUpperCase());
},
});
const addPrefix = new TransformStream({
transform(chunk, controller) {
controller.enqueue(`[PREFIX] ${chunk}`);
},
});
readable
.pipeThrough(uppercase)
.pipeThrough(addPrefix)
.pipeTo(writable);
Compression Streams
CompressionStream
const input = new ReadableStream({
start(controller) {
controller.enqueue("Some data to compress");
controller.close();
},
});
const compressed = input.pipeThrough(
new CompressionStream("gzip")
);
// Write compressed data
const file = Bun.file("./output.gz");
await compressed.pipeTo(file.writer());
DecompressionStream
const file = Bun.file("./data.gz");
const decompressed = file.stream().pipeThrough(
new DecompressionStream("gzip")
);
for await (const chunk of decompressed) {
console.log(new TextDecoder().decode(chunk));
}
Supported Formats
"gzip"- GZIP compression"deflate"- DEFLATE compression"deflate-raw"- Raw DEFLATE (no headers)
Text Encoding/Decoding Streams
TextEncoderStream
const encoder = new TextEncoderStream();
const textStream = new ReadableStream({
start(controller) {
controller.enqueue("Hello");
controller.enqueue("World");
controller.close();
},
});
const bytes = textStream.pipeThrough(encoder);
// Now contains Uint8Array chunks
TextDecoderStream
const decoder = new TextDecoderStream("utf-8");
const byteStream = Bun.file("./file.txt").stream();
const textStream = byteStream.pipeThrough(decoder);
for await (const text of textStream) {
console.log(text);
}
File Streams
Reading Files
const file = Bun.file("./large.txt");
const stream = file.stream();
for await (const chunk of stream) {
console.log(chunk); // Uint8Array
}
Writing Files
const file = Bun.file("./output.txt");
const writer = file.writer();
writer.write("Line 1\n");
writer.write("Line 2\n");
await writer.end();
Piping Files
const input = Bun.file("./input.txt");
const output = Bun.file("./output.txt");
await input.stream().pipeTo(output.writer());
HTTP Response Streams
Streaming Response
Bun.serve({
fetch(req) {
const stream = new ReadableStream({
async start(controller) {
for (let i = 0; i < 10; i++) {
controller.enqueue(`Chunk ${i}\n`);
await Bun.sleep(100);
}
controller.close();
},
});
return new Response(stream);
},
});
Server-Sent Events
Bun.serve({
fetch(req) {
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
for (let i = 0; i < 10; i++) {
const message = `data: ${JSON.stringify({ count: i })}\n\n`;
controller.enqueue(encoder.encode(message));
await Bun.sleep(1000);
}
controller.close();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
},
});
Streaming File Upload
const file = Bun.file("./large-file.dat");
const response = await fetch("https://example.com/upload", {
method: "POST",
body: file.stream(),
headers: {
"Content-Type": "application/octet-stream",
},
});
Stream Utilities
Teeing Streams
const [stream1, stream2] = readable.tee();
// Now you can read from both
stream1.pipeTo(destination1);
stream2.pipeTo(destination2);
Canceling Streams
const reader = stream.getReader();
// Cancel after 5 seconds
setTimeout(() => {
reader.cancel("Timeout");
}, 5000);
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
} catch (err) {
console.error("Stream cancelled", err);
}
Backpressure Handling
const stream = new ReadableStream({
async pull(controller) {
// Check desired size
if (controller.desiredSize <= 0) {
// Wait before enqueuing more
await Bun.sleep(100);
}
controller.enqueue(data);
},
});
Direct Streams (Bun-specific)
Bun supports direct streams for zero-copy I/O:const stream = new ReadableStream({
type: "direct",
async pull(controller) {
// Get a buffer to write into
const buffer = controller.write(1024);
// Fill the buffer
const bytesWritten = await readInto(buffer);
if (bytesWritten === 0) {
controller.close();
}
},
});
Stream from Async Iterator
async function* generate() {
for (let i = 0; i < 10; i++) {
await Bun.sleep(100);
yield `Chunk ${i}`;
}
}
const stream = ReadableStream.from(generate());
Error Handling
const stream = new ReadableStream({
start(controller) {
try {
// Risky operation
controller.enqueue(data);
} catch (err) {
controller.error(err);
}
},
});
try {
for await (const chunk of stream) {
console.log(chunk);
}
} catch (err) {
console.error("Stream error:", err);
}
Type Signatures
interface ReadableStream<R = any> {
readonly locked: boolean;
cancel(reason?: any): Promise<void>;
getReader(): ReadableStreamDefaultReader<R>;
pipeThrough<T>(transform: TransformStream<R, T>): ReadableStream<T>;
pipeTo(destination: WritableStream<R>): Promise<void>;
tee(): [ReadableStream<R>, ReadableStream<R>];
[Symbol.asyncIterator](): AsyncIterableIterator<R>;
}
interface WritableStream<W = any> {
readonly locked: boolean;
abort(reason?: any): Promise<void>;
close(): Promise<void>;
getWriter(): WritableStreamDefaultWriter<W>;
}
interface TransformStream<I = any, O = any> {
readonly readable: ReadableStream<O>;
readonly writable: WritableStream<I>;
}
interface ReadableStreamDefaultController<R> {
readonly desiredSize: number | null;
close(): void;
enqueue(chunk: R): void;
error(error: any): void;
}
interface WritableStreamDefaultController {
error(error: any): void;
}
interface TransformStreamDefaultController<O> {
readonly desiredSize: number | null;
enqueue(chunk: O): void;
error(reason: any): void;
terminate(): void;
}