Overview
endWith emits all values from the source Observable, and when the source completes, it synchronously emits one or more specified values. This is useful for knowing when an Observable ends or for appending cleanup/completion values.
Particularly useful when paired with takeUntil to know when a stream has ended, or to append completion markers to data streams.
Type Signature
export function endWith < T , A extends readonly unknown [] = T []>(
... values : A
) : OperatorFunction < T , T | ValueFromArray < A >>
Parameters
One or more values to emit synchronously after the source Observable completes. These values are emitted in the order provided.
Returns
OperatorFunction<T, T | ValueFromArray<A>> - An operator function that returns an Observable that emits all source values, then synchronously emits the provided values after the source completes.
Usage Examples
Basic Example: Interval with End Notification
Basic Usage
Multiple End Values
import { interval , map , fromEvent , startWith , takeUntil , endWith } from 'rxjs' ;
const ticker$ = interval ( 5000 ). pipe (
map (() => 'tick' )
);
const documentClicks$ = fromEvent ( document , 'click' );
ticker$ . pipe (
startWith ( 'interval started' ),
takeUntil ( documentClicks$ ),
endWith ( 'interval ended by click' )
). subscribe ( x => console . log ( x ));
// Output (assuming click after 15 seconds):
// 'interval started'
// 'tick' (5s)
// 'tick' (10s)
// 'tick' (15s)
// 'interval ended by click' (on click)
Real-World Example: API Request with Status Messages
import { ajax } from 'rxjs/ajax' ;
import { map , endWith , startWith , catchError , of } from 'rxjs' ;
interface StatusMessage {
type : 'status' | 'data' | 'error' ;
message : string ;
data ?: any ;
}
function fetchUserData ( userId : string ) : Observable < StatusMessage > {
return ajax . getJSON ( `/api/users/ ${ userId } ` ). pipe (
map ( user => ({
type: 'data' as const ,
message: 'User data loaded' ,
data: user
})),
startWith ({
type: 'status' as const ,
message: 'Loading user data...'
}),
endWith ({
type: 'status' as const ,
message: 'Request completed'
}),
catchError ( err => of ({
type: 'error' as const ,
message: `Failed to load user: ${ err . message } `
}))
);
}
fetchUserData ( '123' ). subscribe (( status : StatusMessage ) => {
console . log ( `[ ${ status . type } ] ${ status . message } ` );
if ( status . data ) {
displayUser ( status . data );
}
});
// Output:
// [status] Loading user data...
// [data] User data loaded
// [status] Request completed
File Upload with Progress and Completion
import { Observable , endWith } from 'rxjs' ;
interface UploadProgress {
type : 'progress' | 'complete' ;
filename : string ;
progress : number ;
url ?: string ;
}
function uploadFile ( file : File ) : Observable < UploadProgress > {
return new Observable < UploadProgress >( subscriber => {
const xhr = new XMLHttpRequest ();
const formData = new FormData ();
formData . append ( 'file' , file );
xhr . upload . addEventListener ( 'progress' , ( e ) => {
if ( e . lengthComputable ) {
const progress = ( e . loaded / e . total ) * 100 ;
subscriber . next ({
type: 'progress' ,
filename: file . name ,
progress
});
}
});
xhr . addEventListener ( 'load' , () => {
if ( xhr . status === 200 ) {
const response = JSON . parse ( xhr . responseText );
subscriber . next ({
type: 'progress' ,
filename: file . name ,
progress: 100
});
subscriber . complete ();
} else {
subscriber . error ( new Error ( 'Upload failed' ));
}
});
xhr . open ( 'POST' , '/api/upload' );
xhr . send ( formData );
}). pipe (
endWith ({
type: 'complete' as const ,
filename: file . name ,
progress: 100 ,
url: '/uploads/' + file . name
})
);
}
const fileInput = document . getElementById ( 'file' ) as HTMLInputElement ;
const selectedFile = fileInput . files ! [ 0 ];
uploadFile ( selectedFile ). subscribe (( progress : UploadProgress ) => {
if ( progress . type === 'progress' ) {
updateProgressBar ( progress . filename , progress . progress );
} else if ( progress . type === 'complete' ) {
console . log ( `Upload complete: ${ progress . url } ` );
showSuccessMessage ( progress . filename );
}
});
import { from , map , scan , endWith } from 'rxjs' ;
interface DataItem {
id : number ;
value : string ;
}
interface Summary {
total : number ;
timestamp : number ;
message : string ;
}
const data : DataItem [] = [
{ id: 1 , value: 'Item 1' },
{ id: 2 , value: 'Item 2' },
{ id: 3 , value: 'Item 3' }
];
let itemCount = 0 ;
from ( data ). pipe (
map ( item => {
itemCount ++ ;
return item ;
}),
endWith ({
total: itemCount ,
timestamp: Date . now (),
message: 'End of data stream'
} as unknown as DataItem ) // Type assertion for union type
). subscribe ( item => {
if ( 'message' in item ) {
const summary = item as unknown as Summary ;
console . log ( ` \n --- Summary ---` );
console . log ( `Total items: ${ summary . total } ` );
console . log ( `Completed at: ${ new Date ( summary . timestamp ). toISOString () } ` );
console . log ( summary . message );
} else {
console . log ( `Processing: ${ item . value } ` );
}
});
Practical Scenarios
endWith is synchronous - all end values are emitted immediately when the source completes, before the complete notification is sent to observers.
Scenario 1: Log Processing with Summary
import { from , map , scan , endWith } from 'rxjs' ;
interface LogEntry {
level : 'info' | 'warn' | 'error' ;
message : string ;
timestamp : number ;
}
interface LogSummary {
totalEntries : number ;
errorCount : number ;
warnCount : number ;
infoCount : number ;
}
const logEntries : LogEntry [] = [
{ level: 'info' , message: 'App started' , timestamp: Date . now () },
{ level: 'warn' , message: 'Low memory' , timestamp: Date . now () },
{ level: 'error' , message: 'Connection failed' , timestamp: Date . now () },
{ level: 'info' , message: 'Retrying...' , timestamp: Date . now () }
];
let summary : LogSummary = {
totalEntries: 0 ,
errorCount: 0 ,
warnCount: 0 ,
infoCount: 0
};
from ( logEntries ). pipe (
map ( entry => {
summary . totalEntries ++ ;
summary [ ` ${ entry . level } Count` ] ++ ;
return entry ;
}),
endWith (
{ level: 'info' , message: '=== Log Summary ===' , timestamp: Date . now () } as LogEntry ,
{ level: 'info' , message: `Total: ${ summary . totalEntries } ` , timestamp: Date . now () } as LogEntry ,
{ level: 'info' , message: `Errors: ${ summary . errorCount } ` , timestamp: Date . now () } as LogEntry ,
{ level: 'info' , message: `Warnings: ${ summary . warnCount } ` , timestamp: Date . now () } as LogEntry
)
). subscribe ( entry => {
console . log ( `[ ${ entry . level . toUpperCase () } ] ${ entry . message } ` );
});
Scenario 2: Animation Sequence with Cleanup
import { interval , take , map , endWith } from 'rxjs' ;
type AnimationFrame = { frame : number ; position : number } | { cleanup : true };
function animateElement ( element : HTMLElement ) : Observable < AnimationFrame > {
return interval ( 16 ). pipe ( // ~60fps
take ( 60 ), // 1 second animation
map ( frame => ({
frame ,
position: easeOutCubic ( frame / 60 ) * 300 // Move 300px
})),
endWith ({ cleanup: true } as AnimationFrame )
);
}
function easeOutCubic ( t : number ) : number {
return 1 - Math . pow ( 1 - t , 3 );
}
const box = document . querySelector ( '.box' ) as HTMLElement ;
animateElement ( box ). subscribe (( frame : AnimationFrame ) => {
if ( 'cleanup' in frame ) {
// Animation complete, cleanup
box . style . transition = 'none' ;
console . log ( 'Animation cleanup complete' );
} else {
box . style . transform = `translateX( ${ frame . position } px)` ;
}
});
import { from , map , endWith , reduce } from 'rxjs' ;
interface Transaction {
id : string ;
amount : number ;
date : string ;
}
type ExportRow = string ;
function exportTransactionsToCSV ( transactions : Transaction []) : Observable < ExportRow > {
let totalAmount = 0 ;
let count = 0 ;
return from ( transactions ). pipe (
map ( transaction => {
totalAmount += transaction . amount ;
count ++ ;
return ` ${ transaction . id } , ${ transaction . amount } , ${ transaction . date } ` ;
}),
endWith (
'' , // Empty line
`Total Transactions, ${ count } ` ,
`Total Amount, ${ totalAmount . toFixed ( 2 ) } ` ,
`Export Date, ${ new Date (). toISOString () } `
)
);
}
const transactions : Transaction [] = [
{ id: 'TXN001' , amount: 100.50 , date: '2024-01-01' },
{ id: 'TXN002' , amount: 250.00 , date: '2024-01-02' },
{ id: 'TXN003' , amount: 75.25 , date: '2024-01-03' }
];
exportTransactionsToCSV ( transactions ). pipe (
reduce (( csv , row ) => csv + row + ' \n ' , 'ID,Amount,Date \n ' )
). subscribe ( csv => {
downloadCSV ( 'transactions.csv' , csv );
console . log ( 'Export complete' );
});
Scenario 4: WebSocket Stream with Disconnect Message
import { webSocket } from 'rxjs/webSocket' ;
import { endWith , catchError , of } from 'rxjs' ;
interface Message {
type : 'data' | 'system' ;
content : string ;
timestamp : number ;
}
const ws$ = webSocket < Message >( 'ws://localhost:8080/chat' );
ws$ . pipe (
endWith ({
type: 'system' as const ,
content: 'Disconnected from server' ,
timestamp: Date . now ()
}),
catchError ( err => of ({
type: 'system' as const ,
content: `Connection error: ${ err . message } ` ,
timestamp: Date . now ()
}))
). subscribe (( message : Message ) => {
if ( message . type === 'system' ) {
console . log ( `[SYSTEM] ${ message . content } ` );
showSystemMessage ( message . content );
} else {
console . log ( `[MESSAGE] ${ message . content } ` );
displayChatMessage ( message );
}
});
Behavior Details
Emission Timing
All source values are emitted normally
When source completes, endWith values are emitted synchronously
Complete notification is sent after all endWith values
import { of , endWith , tap } from 'rxjs' ;
of ( 1 , 2 , 3 ). pipe (
endWith ( 4 , 5 ),
tap ({
next : x => console . log ( 'Value:' , x ),
complete : () => console . log ( 'Complete!' )
})
). subscribe ();
// Output:
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Value: 5
// Complete!
Error Handling
If the source Observable errors, endWith values are NOT emitted. The error is propagated immediately.
import { throwError , endWith , catchError , of } from 'rxjs' ;
throwError (() => new Error ( 'Failed' )). pipe (
endWith ( 'This will not be emitted' ),
catchError ( err => {
console . error ( 'Error:' , err . message );
return of ( 'Recovered' ). pipe (
endWith ( 'Now this will be emitted' )
);
})
). subscribe ( console . log );
// Output:
// Error: Failed
// Recovered
// Now this will be emitted
Comparison with Similar Operators
Operator When values are emitted startWithBefore source starts endWithAfter source completes concatAfter source completes (subscribes to new Observable) finalizeAfter complete/error (for side effects only)
startWith - Prepends values before source emissions
concat - Concatenates Observables sequentially
finalize - Executes a callback when Observable completes or errors
concatWith - Appends Observables after source completes