Reactive programming with RxJS enables ScreenPulse to handle asynchronous data flows elegantly. This guide covers advanced patterns used throughout the application for managing state, handling user interactions, and coordinating multiple data streams.
Component Subscriptions
Components subscribe to observables to react to data changes and user interactions.
Pattern 1: Async Pipe for State Display
The most common pattern in ScreenPulse is using the async pipe to display observable data:
src/app/layout/navbar/navbar.component.ts
import { Component , ChangeDetectionStrategy } from '@angular/core' ;
import { Router } from '@angular/router' ;
import { AuthService } from 'src/app/core/services/auth.service' ;
import { combineLatest , Observable } from 'rxjs' ;
import { UserSessionData } from 'src/app/shared/models/user-session.model' ;
@ Component ({
selector: 'app-navbar' ,
templateUrl: './navbar.component.html' ,
styleUrls: [ './navbar.component.scss' ],
changeDetection: ChangeDetectionStrategy . OnPush
})
export class NavbarComponent {
// Combine multiple state streams into a single observable
readonly userData$ : Observable < UserSessionData > = combineLatest ({
email: this . authService . getUserMailObservable (),
isLoggedIn: this . authService . isLoggedInObservable ()
});
constructor (
private authService : AuthService ,
private router : Router
) { }
logOut () {
this . authService . logOut ();
this . router . navigate ([ '' ]);
}
}
< ng-container *ngIf = "userData$ | async as userData" >
< div *ngIf = "userData.isLoggedIn" class = "user-info" >
< span > {{ userData.email }} </ span >
< button (click) = "logOut()" > Log Out </ button >
</ div >
< div *ngIf = "!userData.isLoggedIn" class = "auth-links" >
< a routerLink = "/auth/login" > Login </ a >
< a routerLink = "/auth/register" > Register </ a >
</ div >
</ ng-container >
Using ChangeDetectionStrategy.OnPush with async pipe optimizes performance by only checking for changes when observable values emit.
Pattern 2: Imperative Subscriptions for Actions
For user actions that trigger side effects, subscribe imperatively in component methods:
src/app/pages/search/page/search.component.ts
import { Component } from '@angular/core' ;
import { EMPTY , switchMap , take } from 'rxjs' ;
import { MediaItem } from 'src/app/shared/models/movie.model' ;
addToFavorites ( mediaItem : MediaItem ) {
this . authService . isLoggedInObservable (). pipe (
take ( 1 ), // Complete after first emission
switchMap ( loggedIn => {
if ( ! loggedIn ) {
// Show warning and redirect
this . toastrService . warning (
'You must be logged in to add movies to your list' ,
'Error'
);
this . router . navigate ([ '/auth/login' ]);
return EMPTY ; // Empty observable - no further processing
}
// Authenticated - proceed with API call
return this . favoritesService . addToFavorites ( mediaItem );
})
). subscribe ({
next : () => this . toastrService . success (
mediaItem . title ,
'Added to favorites'
),
error : ( error ) => this . toastrService . warning ( error . message )
});
}
This pattern chains authentication checking with the API call. If not authenticated, it returns EMPTY to gracefully terminate the stream without calling the API.
Observable Chains
Chaining operators with pipe() creates powerful data transformation pipelines.
Authentication Flow
this . authService . isLoggedInObservable ()
. pipe (
take ( 1 ), // Get current value once
tap ( loggedIn => { // Side effect: log status
console . log ( 'Auth status:' , loggedIn );
}),
switchMap ( loggedIn => { // Transform to new observable
return loggedIn
? this . api . getData ()
: of ([]);
}),
catchError ( error => { // Handle errors
this . toastr . error ( error . message );
return of ([]);
})
)
. subscribe ( data => {
// Handle successful data
});
Loading State Management
src/app/pages/search/page/search.component.ts
import { finalize } from 'rxjs/operators' ;
openMediaItem ( mediaItem : MediaItem ): void {
this . loadingCard = true ;
this . dialogService
. openMediaItem ( window . innerWidth , mediaItem , false )
. pipe (
finalize (() => {
// Always execute when observable completes or errors
this . loadingCard = false ;
})
)
. subscribe ();
}
finalize() is perfect for cleanup operations like hiding loading spinners, as it runs whether the observable succeeds, fails, or is cancelled.
Combining Multiple Streams
ScreenPulse uses several operators to coordinate multiple observables.
combineLatest()
Emits whenever any source observable emits:
import { combineLatest } from 'rxjs' ;
// Combine authentication state and user preferences
readonly viewModel$ = combineLatest ({
isLoggedIn: this . authService . isLoggedInObservable (),
userEmail: this . authService . getUserMailObservable (),
favorites: this . favoritesService . getFavorites (),
searchResults: this . searchService . getResults ()
});
// Emits:
// { isLoggedIn: true, userEmail: '[email protected] ', favorites: [...], searchResults: [...] }
< div *ngIf = "viewModel$ | async as vm" >
< div *ngIf = "vm.isLoggedIn" >
< p > Welcome, {{ vm.userEmail }} </ p >
< app-favorites [favorites] = "vm.favorites" ></ app-favorites >
</ div >
< app-search-results [results] = "vm.searchResults" ></ app-search-results >
</ div >
switchMap()
Cancels previous inner observable when a new value arrives:
import { switchMap } from 'rxjs/operators' ;
// Search as user types - cancel previous search if new input arrives
searchControl . valueChanges . pipe (
debounceTime ( 300 ),
distinctUntilChanged (),
switchMap ( query => this . omdbService . search ( query )) // Cancel previous search
). subscribe ( results => {
this . searchResults = results ;
});
Use switchMap() for requests that should cancel previous requests (like search-as-you-type). Use mergeMap() if all requests should complete (like logging).
forkJoin()
Waits for all observables to complete, then emits combined result:
import { forkJoin } from 'rxjs' ;
// Fetch multiple resources in parallel
forkJoin ({
user: this . userService . getUser (),
favorites: this . favoritesService . getFavorites (),
recommendations: this . recommendationService . getRecommendations ()
}). subscribe (({ user , favorites , recommendations }) => {
// All three requests completed
this . initializeDashboard ( user , favorites , recommendations );
});
ScreenPulse uses reactive forms with observable value streams:
import { FormBuilder , FormGroup , Validators } from '@angular/forms' ;
import { debounceTime , distinctUntilChanged } from 'rxjs/operators' ;
export class SearchBarComponent implements OnInit {
searchForm : FormGroup ;
constructor ( private fb : FormBuilder ) {
this . searchForm = this . fb . group ({
title: [ '' , Validators . required ],
type: [ 'all' ],
year: [ '' ]
});
}
ngOnInit () {
// React to title changes with debouncing
this . searchForm . get ( 'title' )?. valueChanges . pipe (
debounceTime ( 300 ),
distinctUntilChanged (),
tap ( title => console . log ( 'Search for:' , title ))
). subscribe ();
}
onSubmit () {
if ( this . searchForm . valid ) {
const filters = this . searchForm . value ;
this . searchSubmitted . emit ( filters );
}
}
}
Error Handling Patterns
Global Error Interceptor
Handle errors at the HTTP layer:
src/app/core/interceptors/error.interceptor.ts
import { Injectable } from '@angular/core' ;
import { HttpInterceptor , HttpRequest , HttpHandler , HttpErrorResponse } from '@angular/common/http' ;
import { Observable , throwError } from 'rxjs' ;
import { catchError } from 'rxjs/operators' ;
import { ToastrService } from 'ngx-toastr' ;
@ Injectable ()
export class ErrorInterceptor implements HttpInterceptor {
constructor ( private toastr : ToastrService ) {}
intercept ( req : HttpRequest < any >, next : HttpHandler ) : Observable < any > {
return next . handle ( req ). pipe (
catchError (( error : HttpErrorResponse ) => {
let errorMessage = 'An error occurred' ;
if ( error . error instanceof ErrorEvent ) {
// Client-side error
errorMessage = error . error . message ;
} else {
// Server-side error
errorMessage = `Error ${ error . status } : ${ error . message } ` ;
}
this . toastr . error ( errorMessage );
return throwError (() => error );
})
);
}
}
Component-Level Error Handling
Handle specific errors in components:
this . omdbService . fetchMediaItems ( title , type , year , page )
. subscribe ({
next : ( response ) => {
if ( response . Response === "True" ) {
this . searchState . collection = response . Search || [];
this . searchState . collectionSize = Number ( response . totalResults ) || 0 ;
} else {
// API returned error
this . toastrService . warning ( response . Error , 'Try again!' );
this . searchState . collection = [];
}
},
error : ( error ) => {
// HTTP or network error
this . toastrService . error ( error . message );
this . searchState . collection = [];
},
complete : () => {
this . searchState . searchOnProcess = false ;
}
});
Memory Leak Prevention
Using Async Pipe (Recommended)
export class NavbarComponent {
// Observable exposed to template
userData$ = combineLatest ({
email: this . authService . getUserMailObservable (),
isLoggedIn: this . authService . isLoggedInObservable ()
});
}
<!-- Async pipe handles subscription/unsubscription -->
< div *ngIf = "userData$ | async as userData" >
{{ userData.email }}
</ div >
Using take() for One-Time Operations
import { take } from 'rxjs/operators' ;
checkAuth () {
this . authService . isLoggedInObservable ()
. pipe ( take ( 1 )) // Automatically completes after first emission
. subscribe ( loggedIn => {
// Handle authentication status
});
}
Using takeUntil() for Multiple Subscriptions
import { Subject } from 'rxjs' ;
import { takeUntil } from 'rxjs/operators' ;
export class MyComponent implements OnDestroy {
private destroy$ = new Subject < void >();
ngOnInit () {
// All subscriptions use takeUntil(this.destroy$)
this . service . getData ()
. pipe ( takeUntil ( this . destroy$ ))
. subscribe ( data => this . data = data );
this . service . getMoreData ()
. pipe ( takeUntil ( this . destroy$ ))
. subscribe ( data => this . moreData = data );
}
ngOnDestroy () {
this . destroy$ . next (); // Emit to complete all subscriptions
this . destroy$ . complete ();
}
}
Best Practices Summary
Prefer async pipe
Use async pipe in templates whenever possible for automatic subscription management
Chain operators with pipe()
Transform data with operators instead of nested subscriptions
Use switchMap for dependent calls
Chain API calls that depend on previous results
Always handle errors
Use catchError at appropriate levels (global interceptor + component handlers)
Clean up subscriptions
Use async pipe, take(), or takeUntil() to prevent memory leaks
Expose observables, not subjects
Services should return observables to prevent external modifications
Common Anti-Patterns
Nested Subscriptions // BAD - Nested subscriptions
this . service1 . getData (). subscribe ( data1 => {
this . service2 . getData ( data1 ). subscribe ( data2 => {
this . service3 . getData ( data2 ). subscribe ( data3 => {
// Do something
});
});
});
// GOOD - Use switchMap
this . service1 . getData (). pipe (
switchMap ( data1 => this . service2 . getData ( data1 )),
switchMap ( data2 => this . service3 . getData ( data2 ))
). subscribe ( data3 => {
// Do something
});
Manual Subscription Without Cleanup // BAD - Memory leak
ngOnInit () {
this . service . getData (). subscribe ( data => this . data = data );
}
// GOOD - Use async pipe or takeUntil
data$ = this . service . getData ();
// Or with takeUntil
private destroy$ = new Subject < void >();
ngOnInit () {
this . service . getData ()
. pipe ( takeUntil ( this . destroy$ ))
. subscribe ( data => this . data = data );
}
ngOnDestroy () {
this . destroy$ . next ();
this . destroy$ . complete ();
}
Next Steps
RxJS Patterns Learn foundational RxJS concepts
Services Understand service architecture
HTTP Interceptors Transform requests globally
State Management Advanced state management strategies