Overview
concatWith emits all values from the source Observable, and once it completes, subscribes to each provided Observable in sequence, emitting their values. It waits for each Observable to complete before moving to the next one.
concat(a$, b$, c$) is equivalent to a$.pipe(concatWith(b$, c$)). The operator version is more convenient for chaining.
Type Signature
export function concatWith < T , A extends readonly unknown []>(
... otherSources : [ ... ObservableInputTuple < A >]
) : OperatorFunction < T , T | A [ number ]>
Parameters
otherSources
ObservableInputTuple<A>
required
One or more Observable sources to subscribe to sequentially after the source Observable completes. Each will be subscribed to only after the previous completes.
Returns
OperatorFunction<T, T | A[number]> - An operator function that returns an Observable concatenating subscriptions to the source and provided Observables, subscribing to the next only once the current subscription completes.
Usage Examples
Basic Example: Mouse Click then Mouse Moves
Basic Usage
Multiple Sequences
import { fromEvent , map , take , concatWith } from 'rxjs' ;
const clicks$ = fromEvent ( document , 'click' );
const moves$ = fromEvent ( document , 'mousemove' );
clicks$ . pipe (
map (() => 'click' ),
take ( 1 ),
concatWith (
moves$ . pipe (
map (() => 'move' )
)
)
). subscribe ( x => console . log ( x ));
// Output:
// 'click'
// 'move'
// 'move'
// 'move'
// ...
Real-World Example: Multi-Step Onboarding Flow
import { fromEvent , map , take , concatWith , delay } from 'rxjs' ;
interface OnboardingStep {
step : number ;
title : string ;
message : string ;
}
function showOnboardingStep ( step : OnboardingStep ) {
return new Observable < string >( subscriber => {
const dialog = document . createElement ( 'div' );
dialog . innerHTML = `
<h2> ${ step . title } </h2>
<p> ${ step . message } </p>
<button id="next- ${ step . step } ">Next</button>
` ;
document . body . appendChild ( dialog );
const button = document . getElementById ( `next- ${ step . step } ` );
fromEvent ( button ! , 'click' ). pipe ( take ( 1 )). subscribe (() => {
document . body . removeChild ( dialog );
subscriber . next ( `Step ${ step . step } completed` );
subscriber . complete ();
});
});
}
const step1$ = showOnboardingStep ({
step: 1 ,
title: 'Welcome!' ,
message: 'Let \' s get you started'
});
const step2$ = showOnboardingStep ({
step: 2 ,
title: 'Profile Setup' ,
message: 'Tell us about yourself'
});
const step3$ = showOnboardingStep ({
step: 3 ,
title: 'All Set!' ,
message: 'You \' re ready to go'
});
step1$ . pipe (
concatWith ( step2$ , step3$ )
). subscribe ({
next : msg => console . log ( msg ),
complete : () => console . log ( 'Onboarding completed!' )
});
Sequential API Operations
import { from , concatWith , tap } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
interface CreateUserRequest {
name : string ;
email : string ;
}
interface User {
id : string ;
name : string ;
email : string ;
}
function createUser ( userData : CreateUserRequest ) {
return ajax . post < User >( '/api/users' , userData ). pipe (
tap ( response => console . log ( 'User created:' , response . response . id ))
);
}
function sendWelcomeEmail ( userId : string ) {
return ajax . post ( '/api/emails/welcome' , { userId }). pipe (
tap (() => console . log ( 'Welcome email sent' ))
);
}
function assignDefaultPermissions ( userId : string ) {
return ajax . post ( '/api/permissions/assign-defaults' , { userId }). pipe (
tap (() => console . log ( 'Default permissions assigned' ))
);
}
function setupNewUser ( userData : CreateUserRequest ) {
let userId : string ;
return createUser ( userData ). pipe (
tap ( response => userId = response . response . id ),
concatWith (
sendWelcomeEmail ( userId ),
assignDefaultPermissions ( userId )
)
);
}
setupNewUser ({
name: 'John Doe' ,
email: 'john@example.com'
}). subscribe ({
complete : () => console . log ( 'User setup complete' )
});
Video Player Playlist
import { fromEvent , map , take , concatWith , defer } from 'rxjs' ;
interface Video {
id : string ;
title : string ;
url : string ;
duration : number ;
}
function playVideo ( video : Video ) {
return new Observable < string >( subscriber => {
console . log ( `Playing: ${ video . title } ` );
const videoElement = document . querySelector ( 'video' ) as HTMLVideoElement ;
videoElement . src = video . url ;
videoElement . play ();
const onEnded = () => {
console . log ( `Finished: ${ video . title } ` );
subscriber . next ( `Completed: ${ video . title } ` );
subscriber . complete ();
videoElement . removeEventListener ( 'ended' , onEnded );
};
videoElement . addEventListener ( 'ended' , onEnded );
return () => {
videoElement . pause ();
videoElement . removeEventListener ( 'ended' , onEnded );
};
});
}
const playlist : Video [] = [
{ id: '1' , title: 'Intro' , url: '/videos/intro.mp4' , duration: 30 },
{ id: '2' , title: 'Tutorial 1' , url: '/videos/tutorial1.mp4' , duration: 300 },
{ id: '3' , title: 'Tutorial 2' , url: '/videos/tutorial2.mp4' , duration: 450 },
];
const [ first , ... rest ] = playlist ;
playVideo ( first ). pipe (
concatWith ( ... rest . map ( video => playVideo ( video )))
). subscribe ({
next : msg => console . log ( msg ),
complete : () => console . log ( 'Playlist finished' )
});
Practical Scenarios
Use concatWith when you need to ensure a specific sequence of operations completes in order, such as initialization sequences, sequential workflows, or ordered event handlers.
Scenario 1: Application Initialization
import { of , delay , concatWith , tap } from 'rxjs' ;
function loadConfiguration () {
return ajax . getJSON ( '/api/config' ). pipe (
delay ( 500 ),
tap ( config => console . log ( 'Configuration loaded:' , config ))
);
}
function authenticateUser () {
return ajax . post ( '/api/auth/validate' , {}). pipe (
delay ( 300 ),
tap ( response => console . log ( 'User authenticated:' , response ))
);
}
function loadUserPreferences () {
return ajax . getJSON ( '/api/user/preferences' ). pipe (
delay ( 200 ),
tap ( prefs => console . log ( 'Preferences loaded:' , prefs ))
);
}
function initializeWebSocket () {
return new Observable ( subscriber => {
console . log ( 'WebSocket connecting...' );
const ws = new WebSocket ( 'ws://localhost:8080' );
ws . onopen = () => {
console . log ( 'WebSocket connected' );
subscriber . next ( 'connected' );
subscriber . complete ();
};
});
}
loadConfiguration (). pipe (
concatWith (
authenticateUser (),
loadUserPreferences (),
initializeWebSocket ()
)
). subscribe ({
complete : () => {
console . log ( 'Application initialized successfully' );
showMainUI ();
}
});
import { fromEvent , take , concatWith , switchMap , tap } from 'rxjs' ;
const submitButton = document . getElementById ( 'submit' ) as HTMLButtonElement ;
const confirmButton = document . getElementById ( 'confirm' ) as HTMLButtonElement ;
function showConfirmationDialog () {
return new Observable < boolean >( subscriber => {
const dialog = document . getElementById ( 'confirmation-dialog' ) ! ;
dialog . style . display = 'block' ;
const confirmClick = fromEvent ( confirmButton , 'click' ). pipe (
take ( 1 ),
tap (() => dialog . style . display = 'none' )
);
confirmClick . subscribe (() => {
subscriber . next ( true );
subscriber . complete ();
});
});
}
function submitForm ( data : any ) {
return ajax . post ( '/api/form/submit' , data ). pipe (
tap (() => console . log ( 'Form submitted successfully' ))
);
}
function showSuccessMessage () {
return of ( 'Success!' ). pipe (
tap ( msg => {
const alert = document . createElement ( 'div' );
alert . textContent = msg ;
document . body . appendChild ( alert );
setTimeout (() => document . body . removeChild ( alert ), 3000 );
})
);
}
fromEvent ( submitButton , 'click' ). pipe (
switchMap (() =>
showConfirmationDialog (). pipe (
concatWith (
submitForm ({ data: 'form data' }),
showSuccessMessage ()
)
)
)
). subscribe ();
Behavior Details
Subscription Timing
The source Observable runs to completion first
Each additional Observable is subscribed to only after the previous one completes
Values are emitted in the order they occur across all Observables
Completion and Error Handling
If any Observable in the sequence errors, the error is immediately propagated and subsequent Observables are not subscribed to.
import { of , throwError , concatWith , catchError } from 'rxjs' ;
const source$ = of ( 1 , 2 , 3 );
const error$ = throwError (() => new Error ( 'Failed' ));
const never$ = of ( 4 , 5 , 6 ); // Never subscribed to
source$ . pipe (
concatWith ( error$ , never$ ),
catchError ( err => {
console . error ( 'Error caught:' , err . message );
return of ( 'recovered' );
})
). subscribe ( console . log );
// Output: 1, 2, 3, recovered
Comparison with Other Operators
Operator Behavior concatWithSubscribes sequentially, waits for each to complete mergeWithSubscribes to all immediately, emits concurrently switchMapCancels previous when new arrives raceTakes the first to emit, unsubscribes from others
concat - Static creation operator for concatenating Observables
concatAll - Flattens a higher-order Observable sequentially
concatMap - Maps and concatenates in one operator
mergeWith - Merges Observables concurrently instead of sequentially
startWith - Prepends values before source emissions
endWith - Appends values after source completes