Overview
concatAll flattens an Observable-of-Observables by subscribing to each inner Observable sequentially, one at a time. It subscribes to the next inner Observable only after the previous one completes. This ensures that values are emitted in strict order.
If the source Observable emits inner Observables quickly and they complete slowly, inner Observables will queue up in memory. This can lead to memory issues with fast-emitting sources.
Type Signature
export function concatAll<O extends ObservableInput<any>>(): OperatorFunction<O, ObservedValueOf<O>>
concatAll is equivalent to mergeAll(1) - it’s mergeAll with a concurrency limit of 1.
Parameters
This operator takes no parameters.
Returns
OperatorFunction<O, ObservedValueOf<O>> - An operator function that returns an Observable emitting values from all the inner Observables concatenated in order.
Usage Examples
Basic Example: Sequential Intervals
import { fromEvent, map, interval, take, concatAll } from 'rxjs';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map(() => interval(1000).pipe(take(4)))
);
const firstOrder = higherOrder.pipe(concatAll());
firstOrder.subscribe(x => console.log(x));
// Results (not concurrent):
// After first click: 0 -1s-> 1 -1s-> 2 -1s-> 3
// After second click: 0 -1s-> 1 -1s-> 2 -1s-> 3
// Each inner Observable completes before the next starts
Real-World Example: Sequential API Calls
import { from, map, concatAll, delay } from 'rxjs';
import { ajax } from 'rxjs/ajax';
interface User {
id: string;
name: string;
}
interface UserDetails {
userId: string;
profile: any;
posts: any[];
}
const userIds = ['user1', 'user2', 'user3'];
function fetchUserWithDetails(userId: string) {
return ajax.getJSON<User>(`/api/users/${userId}`).pipe(
delay(1000), // Simulate API rate limiting
map(user => ({
userId: user.id,
profile: user,
posts: []
}))
);
}
// Process users sequentially to respect API rate limits
const users$ = from(
userIds.map(id => fetchUserWithDetails(id))
);
users$.pipe(
concatAll()
).subscribe((details: UserDetails) => {
console.log('User details loaded:', details);
});
// Each user is fetched only after the previous completes
Sequential File Processing
import { from, map, concatAll } from 'rxjs';
import { ajax } from 'rxjs/ajax';
interface UploadResult {
filename: string;
status: 'success' | 'error';
url?: string;
}
function uploadFile(file: File) {
const formData = new FormData();
formData.append('file', file);
return ajax({
url: '/api/upload',
method: 'POST',
body: formData
}).pipe(
map(response => ({
filename: file.name,
status: 'success' as const,
url: response.response.url
}))
);
}
function uploadFilesSequentially(files: File[]) {
return from(files.map(file => uploadFile(file))).pipe(
concatAll()
);
}
const filesToUpload = [
new File(['content1'], 'file1.txt'),
new File(['content2'], 'file2.txt'),
new File(['content3'], 'file3.txt')
];
uploadFilesSequentially(filesToUpload).subscribe(
(result: UploadResult) => console.log('Uploaded:', result.filename),
err => console.error('Upload failed:', err),
() => console.log('All files uploaded successfully')
);
Database Migrations
import { from, map, concatAll, tap } from 'rxjs';
interface Migration {
version: number;
description: string;
execute: () => Observable<void>;
}
const migrations: Migration[] = [
{
version: 1,
description: 'Create users table',
execute: () => runSQL('CREATE TABLE users...')
},
{
version: 2,
description: 'Add email column',
execute: () => runSQL('ALTER TABLE users ADD COLUMN email...')
},
{
version: 3,
description: 'Create indexes',
execute: () => runSQL('CREATE INDEX idx_users_email...')
}
];
function runSQL(query: string) {
return ajax.post('/api/db/execute', { query });
}
function runMigrations(migrations: Migration[]) {
return from(
migrations.map(migration =>
migration.execute().pipe(
tap(() => console.log(`✓ Migration ${migration.version}: ${migration.description}`))
)
)
).pipe(
concatAll()
);
}
runMigrations(migrations).subscribe({
complete: () => console.log('All migrations completed successfully')
});
Practical Scenarios
Use concatAll when the order of operations matters and each operation must complete before the next begins, such as sequential API calls, ordered animations, or database transactions.
Scenario 1: Ordered Animation Sequence
import { from, concatAll } from 'rxjs';
function animateElement(element: HTMLElement, animation: string) {
return new Observable(subscriber => {
element.style.animation = animation;
const onEnd = () => {
subscriber.next(animation);
subscriber.complete();
element.removeEventListener('animationend', onEnd);
};
element.addEventListener('animationend', onEnd);
});
}
const box = document.querySelector('.box') as HTMLElement;
const animations$ = from([
animateElement(box, 'fadeIn 1s'),
animateElement(box, 'slideRight 0.5s'),
animateElement(box, 'bounce 0.3s'),
animateElement(box, 'fadeOut 1s')
]);
animations$.pipe(concatAll()).subscribe({
next: anim => console.log('Animation completed:', anim),
complete: () => console.log('All animations finished')
});
Scenario 2: Queue Processing
import { Subject, concatAll, delay, tap } from 'rxjs';
interface Task {
id: string;
execute: () => Observable<any>;
}
class TaskQueue {
private taskSubject = new Subject<Observable<any>>();
constructor() {
this.taskSubject.pipe(
concatAll()
).subscribe({
next: result => console.log('Task completed:', result),
error: err => console.error('Task failed:', err)
});
}
addTask(task: Task) {
console.log('Adding task to queue:', task.id);
this.taskSubject.next(
task.execute().pipe(
tap(result => console.log(`Task ${task.id} result:`, result))
)
);
}
}
const queue = new TaskQueue();
queue.addTask({
id: 'task-1',
execute: () => of('Result 1').pipe(delay(1000))
});
queue.addTask({
id: 'task-2',
execute: () => of('Result 2').pipe(delay(500))
});
queue.addTask({
id: 'task-3',
execute: () => of('Result 3').pipe(delay(200))
});
// Tasks execute in order: task-1, then task-2, then task-3
Behavior Details
Subscription Strategy
- Subscribes to inner Observables one at a time, in order
- Waits for each inner Observable to complete before subscribing to the next
- Maintains a queue of pending inner Observables
Completion and Errors
If any inner Observable errors, concatAll immediately propagates the error and unsubscribes from the source, leaving remaining inner Observables unprocessed.
import { from, throwError, concatAll, catchError } from 'rxjs';
const source$ = from([
of(1, 2),
throwError(() => new Error('Failed')),
of(3, 4) // This will never be subscribed to
]);
source$.pipe(
concatAll(),
catchError(err => {
console.error('Error caught:', err.message);
return of('recovered');
})
).subscribe(console.log);
// Output: 1, 2, recovered
Memory Considerations
- Inner Observables emitted before the current one completes are buffered
- With fast-emitting sources, this buffer can grow large
- Consider using
mergeAll(n) if some concurrency is acceptable
mergeAll - Flattens with configurable concurrency (immediate by default)
switchAll - Flattens but cancels previous inner Observable
exhaustAll - Flattens but ignores new inner Observables while one is active
concatMap - Maps and flattens sequentially in one operator
concatWith - Concatenates specific Observables after the source completes