Integrate Meros with RxJS to create reactive streams from multipart responses
Meros seamlessly integrates with RxJS, allowing you to transform multipart HTTP responses into reactive observable streams that can be composed with other RxJS operators.
RxJS provides powerful operators for transforming, filtering, and combining asynchronous data streams. By converting Meros’s async generator to an RxJS observable, you can:
Apply RxJS operators like map, filter, debounce, and merge
Combine multiple multipart streams
Handle backpressure and errors declaratively
Integrate with reactive frameworks like Angular
Meros returns an async generator, which RxJS can consume directly using the from operator.
The simplest way to use Meros with RxJS is to wrap the parts in the from operator:
import { meros } from 'meros';import { from } from 'rxjs';const parts = await fetch('/api').then(meros);// Convert async generator to observablefrom(parts).subscribe((part) => { // Do something with each part console.log(part.body);});
Start by fetching your multipart endpoint and converting it to an observable stream:
import { meros } from 'meros';import { from } from 'rxjs';async function createStream() { const response = await fetch('/data'); const parts = await meros<{ letter: string }>(response); return from(parts);}
2
Apply RxJS operators
Transform the stream using RxJS operators:
import { map, filter, tap } from 'rxjs/operators';const stream = await createStream();stream.pipe( // Only process JSON parts filter(part => part.json), // Extract just the body map(part => part.body), // Log each value tap(body => console.log('Received:', body)), // Transform the data map(body => body.letter.toLowerCase())).subscribe({ next: (letter) => { // Handle each letter console.log(letter); }, error: (err) => { console.error('Stream error:', err); }, complete: () => { console.log('Stream complete'); }});
3
Handle the subscription
Remember to unsubscribe when you’re done to prevent memory leaks:
import { bufferCount } from 'rxjs/operators';stream.pipe( bufferCount(5)).subscribe((parts) => { // Receive 5 parts at a time console.log('Batch of 5:', parts);});
When processing is slower than data arrival, use operators like throttle or sample:
import { throttleTime } from 'rxjs/operators';stream.pipe( // Process at most once per 100ms throttleTime(100)).subscribe((part) => { // Expensive operation processData(part);});
Meros itself is highly performant, processing over 800,000 ops/sec in browsers, so bottlenecks typically occur in your processing logic.