Skip to main content
Meros seamlessly integrates with RxJS, allowing you to transform multipart HTTP responses into reactive observable streams that can be composed with other RxJS operators.

Why use Meros with RxJS?

RxJS provides powerful operators for transforming, filtering, and combining asynchronous data streams. By converting Meros’s async generator to an RxJS observable, you can:
  • Apply RxJS operators like map, filter, debounce, and merge
  • Combine multiple multipart streams
  • Handle backpressure and errors declaratively
  • Integrate with reactive frameworks like Angular
Meros returns an async generator, which RxJS can consume directly using the from operator.

Basic integration

The simplest way to use Meros with RxJS is to wrap the parts in the from operator:
import { meros } from 'meros';
import { from } from 'rxjs';

const parts = await fetch('/api').then(meros);

// Convert async generator to observable
from(parts).subscribe((part) => {
  // Do something with each part
  console.log(part.body);
});

Complete example

1

Fetch and convert to observable

Start by fetching your multipart endpoint and converting it to an observable stream:
import { meros } from 'meros';
import { from } from 'rxjs';

async function createStream() {
  const response = await fetch('/data');
  const parts = await meros<{ letter: string }>(response);
  
  return from(parts);
}
2

Apply RxJS operators

Transform the stream using RxJS operators:
import { map, filter, tap } from 'rxjs/operators';

const stream = await createStream();

stream.pipe(
  // Only process JSON parts
  filter(part => part.json),
  
  // Extract just the body
  map(part => part.body),
  
  // Log each value
  tap(body => console.log('Received:', body)),
  
  // Transform the data
  map(body => body.letter.toLowerCase())
).subscribe({
  next: (letter) => {
    // Handle each letter
    console.log(letter);
  },
  error: (err) => {
    console.error('Stream error:', err);
  },
  complete: () => {
    console.log('Stream complete');
  }
});
3

Handle the subscription

Remember to unsubscribe when you’re done to prevent memory leaks:
const subscription = stream.pipe(
  map(part => part.body)
).subscribe((body) => {
  console.log(body);
});

// Later, when done
subscription.unsubscribe();
In Angular, use the async pipe or takeUntil operator to automatically handle unsubscription.

Advanced patterns

Combining multiple streams

Merge multiple multipart responses into a single stream:
import { merge } from 'rxjs';
import { meros } from 'meros';
import { from } from 'rxjs';

const stream1 = from(await fetch('/api/users').then(meros));
const stream2 = from(await fetch('/api/posts').then(meros));

merge(stream1, stream2)
  .pipe(
    map(part => part.body)
  )
  .subscribe((data) => {
    console.log('Data from either stream:', data);
  });

Buffering and debouncing

Control the rate at which you process parts:
import { bufferCount } from 'rxjs/operators';

stream.pipe(
  bufferCount(5)
).subscribe((parts) => {
  // Receive 5 parts at a time
  console.log('Batch of 5:', parts);
});

Error handling

Handle errors gracefully using RxJS error operators:
import { catchError, retry } from 'rxjs/operators';
import { of } from 'rxjs';

stream.pipe(
  // Retry up to 3 times on error
  retry(3),
  
  // Catch errors and provide fallback
  catchError((error) => {
    console.error('Stream failed:', error);
    return of({ error: true, message: error.message });
  })
).subscribe({
  next: (data) => console.log(data),
  error: (err) => console.error('Unrecoverable error:', err)
});

Angular integration

Meros works seamlessly with Angular’s reactive patterns:
import { Component, OnInit } from '@angular/core';
import { Observable, from } from 'rxjs';
import { map } from 'rxjs/operators';
import { meros } from 'meros';

interface DataPart {
  letter: string;
}

@Component({
  selector: 'app-stream',
  template: `
    <div *ngFor="let item of data$ | async">
      {{ item.letter }}
    </div>
  `
})
export class StreamComponent implements OnInit {
  data$: Observable<DataPart>;

  async ngOnInit() {
    const response = await fetch('/data');
    const parts = await meros<DataPart>(response);
    
    this.data$ = from(parts).pipe(
      map(part => part.body)
    );
  }
}
The async pipe automatically subscribes and unsubscribes, preventing memory leaks.

GraphQL and RxJS

Combine Meros with RxJS for reactive GraphQL subscriptions:
import { meros } from 'meros';
import { from } from 'rxjs';
import { map, filter } from 'rxjs/operators';

interface GraphQLPart {
  data: any;
  path: string[];
  errors?: any[];
}

async function createGraphQLStream(query: string) {
  const response = await fetch('/graphql', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ query }),
  });

  const parts = await meros<GraphQLPart>(response);
  
  return from(parts).pipe(
    filter(part => part.json),
    map(part => part.body),
    filter(body => !body.errors)
  );
}

// Usage
const stream = await createGraphQLStream(`
  query {
    user @defer {
      id
      name
    }
  }
`);

stream.subscribe((result) => {
  console.log('GraphQL result:', result.data);
});

Performance considerations

Use shareReplay for multiple subscribers

If multiple components need the same stream, use shareReplay to avoid redundant fetches:
import { shareReplay } from 'rxjs/operators';

const sharedStream = stream.pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);

// Multiple subscribers share the same source
sharedStream.subscribe(/* subscriber 1 */);
sharedStream.subscribe(/* subscriber 2 */);

Backpressure handling

When processing is slower than data arrival, use operators like throttle or sample:
import { throttleTime } from 'rxjs/operators';

stream.pipe(
  // Process at most once per 100ms
  throttleTime(100)
).subscribe((part) => {
  // Expensive operation
  processData(part);
});
Meros itself is highly performant, processing over 800,000 ops/sec in browsers, so bottlenecks typically occur in your processing logic.

Next steps

Basic usage

Learn the fundamentals of Meros

GraphQL integration

Use Meros with GraphQL directives

Build docs developers (and LLMs) love