Skip to main content

Async generator pattern

Meros uses async generators to provide a streaming API that works seamlessly with modern JavaScript:
const parts = await fetch('/api').then(meros);

// Consume as an async iterator
for await (const part of parts) {
  console.log(part.body);
}
Async generators allow you to process parts as they arrive, rather than waiting for the entire response to complete. This is crucial for real-time applications and reducing time-to-first-byte latency.

The generate function

At the core of Meros is the generate async generator function:
async function* generate<T>(
  stream: ReadableStream<Uint8Array>,
  boundary: string,
  options?: Options,
): AsyncGenerator<Arrayable<Part<T, string>>>
This function yields parts as they become available from the stream, implementing a streaming parser that:
  1. Reads chunks from the underlying stream
  2. Maintains a buffer for incomplete parts
  3. Yields complete parts immediately
  4. Handles boundary detection across chunk boundaries

Incremental parsing

Meros implements an intelligent buffering strategy to handle boundaries that might be split across chunks:
let buffer = '';
let idx_boundary = buffer.length;
buffer += chunk;

let idx_chunk = chunk.indexOf(boundary);
!!~idx_chunk
  ? (idx_boundary += idx_chunk)
  : (idx_boundary = buffer.indexOf(boundary));

How it works

  1. Chunk arrival: When a new chunk arrives, it’s appended to the buffer
  2. Boundary search: Meros searches for the boundary in both the new chunk and the accumulated buffer
  3. Part extraction: Once a boundary is found, everything before it is processed as a complete part
  4. Buffer reset: The buffer is updated to contain only the remaining unparsed data
This approach ensures that Meros can handle any chunk size, even if boundaries are split across multiple chunks. The buffer only grows to the size of the largest incomplete part.

Yield strategies

Meros offers two yield strategies controlled by the multiple option:

Default mode (eager yielding)

By default, Meros yields each part as soon as it’s parsed:
let is_eager = !options || !options.multiple;
// ...
tmp = { headers, body, json: is_json } as Part<T, string>;
is_eager ? yield tmp : payloads.push(tmp);
This means:
  • Lower latency - parts are available immediately
  • One yield per part
  • Ideal for streaming UIs that update incrementally

Batch mode (multiple: true)

When multiple: true is set, Meros collects all parts from a chunk and yields them as an array:
const chunks = await fetch('/api').then(res => meros(res, { multiple: true }));

for await (const parts of chunks) {
  // parts is an array of Part objects
  for (const part of parts) {
    // Process part
  }
}
This is implemented by accumulating parts:
let payloads = [];
// ... parse parts ...
is_eager ? yield tmp : payloads.push(tmp);
// ...
if (payloads.length) yield payloads;
With multiple: true, the return type changes from AsyncGenerator<Part> to AsyncGenerator<Part[]>. Make sure to handle arrays when using this mode.

When to use batch mode

Batch mode is an optimization for scenarios where you need to process multiple parts synchronously:
  • GraphQL stores: Update the store once with all patches from a chunk
  • Bulk operations: Aggregate data before committing to a database
  • Reduced overhead: Fewer async iterations mean less event loop overhead
As noted in the README (line 150-153), batch mode is particularly useful for GraphQL where you can commit multiple payloads to the store synchronously rather than across multiple process ticks.

Integration with reactive libraries

Because Meros returns an async generator, it integrates seamlessly with reactive programming libraries:

RxJS example

import { from } from 'rxjs';
import { meros } from 'meros';

const parts = await fetch('/api').then(meros);

from(parts).subscribe(part => {
  // React to each part
  console.log(part.body);
});
The from operator converts the async iterable into an Observable stream.

Relay integration

For GraphQL applications using Relay, Meros can be used directly in the network layer:
const parts = await fetch('/graphql', {
  method: 'POST',
  body: JSON.stringify({ query, variables })
}).then(meros);

for await (const part of parts) {
  if (part.json) {
    // Update Relay store with incremental data
    environment.commitPayload(operation, part.body);
  }
}

Cleanup and resource management

Meros properly cleans up resources using a try/finally block:
try {
  let result: ReadableStreamReadResult<Uint8Array>;
  outer: while (!(result = await reader.read()).done) {
    // ... parse chunks ...
  }
} finally {
  if (payloads.length) yield payloads;
  await reader.cancel();
}
This ensures:
  • Any remaining buffered parts are yielded
  • The stream reader is properly cancelled
  • Resources are freed even if iteration stops early
If you break out of the for await loop early, the finally block ensures cleanup happens automatically.

Build docs developers (and LLMs) love