ScreenPulse uses RxJS (Reactive Extensions for JavaScript) to manage asynchronous data streams throughout the application. RxJS provides a powerful way to handle events, HTTP requests, and state changes using observable patterns.
RxJS is a library for reactive programming using Observables, which represent streams of data that can emit multiple values over time.
Think of an Observable as a “lazy Promise” that can emit multiple values. While a Promise resolves once with a single value, an Observable can emit many values over its lifetime.
BehaviorSubject always has a current value. New subscribers immediately receive the latest value:
const subject = new BehaviorSubject(false);subject.next(true);// New subscriber gets current value (true) immediatelysubject.subscribe(value => console.log(value)); // Logs: true
Multicast
Multiple components can subscribe to the same BehaviorSubject and receive updates:
By exposing Observables instead of Subjects, we prevent external code from calling next():
// Good: Only the service can modify stateisLoggedInObservable(): Observable<boolean> { return this.userLoggedInSubject.asObservable();}// Bad: External code could call next() and corrupt state// isLoggedInSubject: BehaviorSubject<boolean>;
Completes the observable after emitting a specified number of values:
src/app/core/guards/auth.guard.ts
import { take, tap } from 'rxjs/operators';canActivate(): Observable<boolean> { return this.authService.isLoggedInObservable().pipe( take(1), // Complete after first emission tap(loggedIn => { if (!loggedIn) { this.toastService.warning( 'You must be logged in to access this page.', 'Access Denied' ); this.router.navigate(['/auth/login']); } }) );}
take(1) is crucial in guards to prevent memory leaks. Without it, the subscription would never complete.
Cancels the previous inner observable when a new value arrives:
src/app/pages/search/page/search.component.ts
import { switchMap, take } from 'rxjs/operators';import { EMPTY } from 'rxjs';addToFavorites(mediaItem: MediaItem) { this.authService.isLoggedInObservable().pipe( take(1), switchMap(loggedIn => { if (!loggedIn) { this.toastrService.warning( 'You must be logged in to add movies to your list', 'Error' ); this.router.navigate(['/auth/login']); return EMPTY; // Return empty observable - no further processing } // Switch to the favorites service observable return this.favoritesService.addToFavorites(mediaItem); }) ).subscribe({ next: () => this.toastrService.success(mediaItem.title, 'Added to favorites'), error: (error) => this.toastrService.warning(error.message) });}
import { catchError } from 'rxjs/operators';import { of } from 'rxjs';this.http.get('/api/data').pipe( catchError(error => { console.error('Error:', error); return of([]); // Return empty array as fallback })).subscribe();
Debouncing User Input
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';searchControl.valueChanges.pipe( debounceTime(300), // Wait 300ms after user stops typing distinctUntilChanged(), // Only emit if value changed switchMap(query => this.searchService.search(query))).subscribe();
Retry Failed Requests
import { retry } from 'rxjs/operators';this.http.get('/api/data').pipe( retry(3) // Retry up to 3 times on failure).subscribe();