Skip to main content

Overview

endWith emits all values from the source Observable, and when the source completes, it synchronously emits one or more specified values. This is useful for knowing when an Observable ends or for appending cleanup/completion values.
Particularly useful when paired with takeUntil to know when a stream has ended, or to append completion markers to data streams.

Type Signature

export function endWith<T, A extends readonly unknown[] = T[]>(
  ...values: A
): OperatorFunction<T, T | ValueFromArray<A>>

Parameters

values
...A[]
required
One or more values to emit synchronously after the source Observable completes. These values are emitted in the order provided.

Returns

OperatorFunction<T, T | ValueFromArray<A>> - An operator function that returns an Observable that emits all source values, then synchronously emits the provided values after the source completes.

Usage Examples

Basic Example: Interval with End Notification

import { interval, map, fromEvent, startWith, takeUntil, endWith } from 'rxjs';

const ticker$ = interval(5000).pipe(
  map(() => 'tick')
);

const documentClicks$ = fromEvent(document, 'click');

ticker$.pipe(
  startWith('interval started'),
  takeUntil(documentClicks$),
  endWith('interval ended by click')
).subscribe(x => console.log(x));

// Output (assuming click after 15 seconds):
// 'interval started'
// 'tick' (5s)
// 'tick' (10s)
// 'tick' (15s)
// 'interval ended by click' (on click)

Real-World Example: API Request with Status Messages

import { ajax } from 'rxjs/ajax';
import { map, endWith, startWith, catchError, of } from 'rxjs';

interface StatusMessage {
  type: 'status' | 'data' | 'error';
  message: string;
  data?: any;
}

function fetchUserData(userId: string): Observable<StatusMessage> {
  return ajax.getJSON(`/api/users/${userId}`).pipe(
    map(user => ({
      type: 'data' as const,
      message: 'User data loaded',
      data: user
    })),
    startWith({
      type: 'status' as const,
      message: 'Loading user data...'
    }),
    endWith({
      type: 'status' as const,
      message: 'Request completed'
    }),
    catchError(err => of({
      type: 'error' as const,
      message: `Failed to load user: ${err.message}`
    }))
  );
}

fetchUserData('123').subscribe((status: StatusMessage) => {
  console.log(`[${status.type}] ${status.message}`);
  if (status.data) {
    displayUser(status.data);
  }
});

// Output:
// [status] Loading user data...
// [data] User data loaded
// [status] Request completed

File Upload with Progress and Completion

import { Observable, endWith } from 'rxjs';

interface UploadProgress {
  type: 'progress' | 'complete';
  filename: string;
  progress: number;
  url?: string;
}

function uploadFile(file: File): Observable<UploadProgress> {
  return new Observable<UploadProgress>(subscriber => {
    const xhr = new XMLHttpRequest();
    const formData = new FormData();
    formData.append('file', file);

    xhr.upload.addEventListener('progress', (e) => {
      if (e.lengthComputable) {
        const progress = (e.loaded / e.total) * 100;
        subscriber.next({
          type: 'progress',
          filename: file.name,
          progress
        });
      }
    });

    xhr.addEventListener('load', () => {
      if (xhr.status === 200) {
        const response = JSON.parse(xhr.responseText);
        subscriber.next({
          type: 'progress',
          filename: file.name,
          progress: 100
        });
        subscriber.complete();
      } else {
        subscriber.error(new Error('Upload failed'));
      }
    });

    xhr.open('POST', '/api/upload');
    xhr.send(formData);
  }).pipe(
    endWith({
      type: 'complete' as const,
      filename: file.name,
      progress: 100,
      url: '/uploads/' + file.name
    })
  );
}

const fileInput = document.getElementById('file') as HTMLInputElement;
const selectedFile = fileInput.files![0];

uploadFile(selectedFile).subscribe((progress: UploadProgress) => {
  if (progress.type === 'progress') {
    updateProgressBar(progress.filename, progress.progress);
  } else if (progress.type === 'complete') {
    console.log(`Upload complete: ${progress.url}`);
    showSuccessMessage(progress.filename);
  }
});
import { from, map, scan, endWith } from 'rxjs';

interface DataItem {
  id: number;
  value: string;
}

interface Summary {
  total: number;
  timestamp: number;
  message: string;
}

const data: DataItem[] = [
  { id: 1, value: 'Item 1' },
  { id: 2, value: 'Item 2' },
  { id: 3, value: 'Item 3' }
];

let itemCount = 0;

from(data).pipe(
  map(item => {
    itemCount++;
    return item;
  }),
  endWith({
    total: itemCount,
    timestamp: Date.now(),
    message: 'End of data stream'
  } as unknown as DataItem) // Type assertion for union type
).subscribe(item => {
  if ('message' in item) {
    const summary = item as unknown as Summary;
    console.log(`\n--- Summary ---`);
    console.log(`Total items: ${summary.total}`);
    console.log(`Completed at: ${new Date(summary.timestamp).toISOString()}`);
    console.log(summary.message);
  } else {
    console.log(`Processing: ${item.value}`);
  }
});

Practical Scenarios

endWith is synchronous - all end values are emitted immediately when the source completes, before the complete notification is sent to observers.

Scenario 1: Log Processing with Summary

import { from, map, scan, endWith } from 'rxjs';

interface LogEntry {
  level: 'info' | 'warn' | 'error';
  message: string;
  timestamp: number;
}

interface LogSummary {
  totalEntries: number;
  errorCount: number;
  warnCount: number;
  infoCount: number;
}

const logEntries: LogEntry[] = [
  { level: 'info', message: 'App started', timestamp: Date.now() },
  { level: 'warn', message: 'Low memory', timestamp: Date.now() },
  { level: 'error', message: 'Connection failed', timestamp: Date.now() },
  { level: 'info', message: 'Retrying...', timestamp: Date.now() }
];

let summary: LogSummary = {
  totalEntries: 0,
  errorCount: 0,
  warnCount: 0,
  infoCount: 0
};

from(logEntries).pipe(
  map(entry => {
    summary.totalEntries++;
    summary[`${entry.level}Count`]++;
    return entry;
  }),
  endWith(
    { level: 'info', message: '=== Log Summary ===', timestamp: Date.now() } as LogEntry,
    { level: 'info', message: `Total: ${summary.totalEntries}`, timestamp: Date.now() } as LogEntry,
    { level: 'info', message: `Errors: ${summary.errorCount}`, timestamp: Date.now() } as LogEntry,
    { level: 'info', message: `Warnings: ${summary.warnCount}`, timestamp: Date.now() } as LogEntry
  )
).subscribe(entry => {
  console.log(`[${entry.level.toUpperCase()}] ${entry.message}`);
});

Scenario 2: Animation Sequence with Cleanup

import { interval, take, map, endWith } from 'rxjs';

type AnimationFrame = { frame: number; position: number } | { cleanup: true };

function animateElement(element: HTMLElement): Observable<AnimationFrame> {
  return interval(16).pipe( // ~60fps
    take(60), // 1 second animation
    map(frame => ({
      frame,
      position: easeOutCubic(frame / 60) * 300 // Move 300px
    })),
    endWith({ cleanup: true } as AnimationFrame)
  );
}

function easeOutCubic(t: number): number {
  return 1 - Math.pow(1 - t, 3);
}

const box = document.querySelector('.box') as HTMLElement;

animateElement(box).subscribe((frame: AnimationFrame) => {
  if ('cleanup' in frame) {
    // Animation complete, cleanup
    box.style.transition = 'none';
    console.log('Animation cleanup complete');
  } else {
    box.style.transform = `translateX(${frame.position}px)`;
  }
});
import { from, map, endWith, reduce } from 'rxjs';

interface Transaction {
  id: string;
  amount: number;
  date: string;
}

type ExportRow = string;

function exportTransactionsToCSV(transactions: Transaction[]): Observable<ExportRow> {
  let totalAmount = 0;
  let count = 0;

  return from(transactions).pipe(
    map(transaction => {
      totalAmount += transaction.amount;
      count++;
      return `${transaction.id},${transaction.amount},${transaction.date}`;
    }),
    endWith(
      '', // Empty line
      `Total Transactions,${count}`,
      `Total Amount,${totalAmount.toFixed(2)}`,
      `Export Date,${new Date().toISOString()}`
    )
  );
}

const transactions: Transaction[] = [
  { id: 'TXN001', amount: 100.50, date: '2024-01-01' },
  { id: 'TXN002', amount: 250.00, date: '2024-01-02' },
  { id: 'TXN003', amount: 75.25, date: '2024-01-03' }
];

exportTransactionsToCSV(transactions).pipe(
  reduce((csv, row) => csv + row + '\n', 'ID,Amount,Date\n')
).subscribe(csv => {
  downloadCSV('transactions.csv', csv);
  console.log('Export complete');
});

Scenario 4: WebSocket Stream with Disconnect Message

import { webSocket } from 'rxjs/webSocket';
import { endWith, catchError, of } from 'rxjs';

interface Message {
  type: 'data' | 'system';
  content: string;
  timestamp: number;
}

const ws$ = webSocket<Message>('ws://localhost:8080/chat');

ws$.pipe(
  endWith({
    type: 'system' as const,
    content: 'Disconnected from server',
    timestamp: Date.now()
  }),
  catchError(err => of({
    type: 'system' as const,
    content: `Connection error: ${err.message}`,
    timestamp: Date.now()
  }))
).subscribe((message: Message) => {
  if (message.type === 'system') {
    console.log(`[SYSTEM] ${message.content}`);
    showSystemMessage(message.content);
  } else {
    console.log(`[MESSAGE] ${message.content}`);
    displayChatMessage(message);
  }
});

Behavior Details

Emission Timing

  • All source values are emitted normally
  • When source completes, endWith values are emitted synchronously
  • Complete notification is sent after all endWith values
import { of, endWith, tap } from 'rxjs';

of(1, 2, 3).pipe(
  endWith(4, 5),
  tap({
    next: x => console.log('Value:', x),
    complete: () => console.log('Complete!')
  })
).subscribe();

// Output:
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Value: 5
// Complete!

Error Handling

If the source Observable errors, endWith values are NOT emitted. The error is propagated immediately.
import { throwError, endWith, catchError, of } from 'rxjs';

throwError(() => new Error('Failed')).pipe(
  endWith('This will not be emitted'),
  catchError(err => {
    console.error('Error:', err.message);
    return of('Recovered').pipe(
      endWith('Now this will be emitted')
    );
  })
).subscribe(console.log);

// Output:
// Error: Failed
// Recovered
// Now this will be emitted

Comparison with Similar Operators

OperatorWhen values are emitted
startWithBefore source starts
endWithAfter source completes
concatAfter source completes (subscribes to new Observable)
finalizeAfter complete/error (for side effects only)
  • startWith - Prepends values before source emissions
  • concat - Concatenates Observables sequentially
  • finalize - Executes a callback when Observable completes or errors
  • concatWith - Appends Observables after source completes