Skip to main content
Reactive programming with RxJS enables ScreenPulse to handle asynchronous data flows elegantly. This guide covers advanced patterns used throughout the application for managing state, handling user interactions, and coordinating multiple data streams.

Component Subscriptions

Components subscribe to observables to react to data changes and user interactions.

Pattern 1: Async Pipe for State Display

The most common pattern in ScreenPulse is using the async pipe to display observable data:
src/app/layout/navbar/navbar.component.ts
import { Component, ChangeDetectionStrategy } from '@angular/core';
import { Router } from '@angular/router';
import { AuthService } from 'src/app/core/services/auth.service';
import { combineLatest, Observable } from 'rxjs';
import { UserSessionData } from 'src/app/shared/models/user-session.model';

@Component({
  selector: 'app-navbar',
  templateUrl: './navbar.component.html',
  styleUrls: ['./navbar.component.scss'],
  changeDetection: ChangeDetectionStrategy.OnPush
})
export class NavbarComponent {
  // Combine multiple state streams into a single observable
  readonly userData$: Observable<UserSessionData> = combineLatest({
    email: this.authService.getUserMailObservable(),
    isLoggedIn: this.authService.isLoggedInObservable() 
  });

  constructor(
    private authService: AuthService,
    private router: Router
  ) { }

  logOut() {
    this.authService.logOut();
    this.router.navigate(['']);
  }
}
navbar.component.html
<ng-container *ngIf="userData$ | async as userData">
  <div *ngIf="userData.isLoggedIn" class="user-info">
    <span>{{ userData.email }}</span>
    <button (click)="logOut()">Log Out</button>
  </div>
  <div *ngIf="!userData.isLoggedIn" class="auth-links">
    <a routerLink="/auth/login">Login</a>
    <a routerLink="/auth/register">Register</a>
  </div>
</ng-container>
Using ChangeDetectionStrategy.OnPush with async pipe optimizes performance by only checking for changes when observable values emit.

Pattern 2: Imperative Subscriptions for Actions

For user actions that trigger side effects, subscribe imperatively in component methods:
src/app/pages/search/page/search.component.ts
import { Component } from '@angular/core';
import { EMPTY, switchMap, take } from 'rxjs';
import { MediaItem } from 'src/app/shared/models/movie.model';

addToFavorites(mediaItem: MediaItem) {
  this.authService.isLoggedInObservable().pipe(
    take(1),  // Complete after first emission
    switchMap(loggedIn => {
      if (!loggedIn) {
        // Show warning and redirect
        this.toastrService.warning(
          'You must be logged in to add movies to your list',
          'Error'
        );
        this.router.navigate(['/auth/login']);
        return EMPTY;  // Empty observable - no further processing
      }
      // Authenticated - proceed with API call
      return this.favoritesService.addToFavorites(mediaItem);
    })
  ).subscribe({
    next: () => this.toastrService.success(
      mediaItem.title,
      'Added to favorites'
    ),
    error: (error) => this.toastrService.warning(error.message)
  });
}
This pattern chains authentication checking with the API call. If not authenticated, it returns EMPTY to gracefully terminate the stream without calling the API.

Observable Chains

Chaining operators with pipe() creates powerful data transformation pipelines.

Authentication Flow

this.authService.isLoggedInObservable()
  .pipe(
    take(1),              // Get current value once
    tap(loggedIn => {     // Side effect: log status
      console.log('Auth status:', loggedIn);
    }),
    switchMap(loggedIn => {  // Transform to new observable
      return loggedIn 
        ? this.api.getData() 
        : of([]);
    }),
    catchError(error => {    // Handle errors
      this.toastr.error(error.message);
      return of([]);
    })
  )
  .subscribe(data => {
    // Handle successful data
  });

Loading State Management

src/app/pages/search/page/search.component.ts
import { finalize } from 'rxjs/operators';

openMediaItem(mediaItem: MediaItem): void {
  this.loadingCard = true;
  
  this.dialogService
    .openMediaItem(window.innerWidth, mediaItem, false)
    .pipe(
      finalize(() => {
        // Always execute when observable completes or errors
        this.loadingCard = false;
      })
    )
    .subscribe();
}
finalize() is perfect for cleanup operations like hiding loading spinners, as it runs whether the observable succeeds, fails, or is cancelled.

Combining Multiple Streams

ScreenPulse uses several operators to coordinate multiple observables.

combineLatest()

Emits whenever any source observable emits:
import { combineLatest } from 'rxjs';

// Combine authentication state and user preferences
readonly viewModel$ = combineLatest({
  isLoggedIn: this.authService.isLoggedInObservable(),
  userEmail: this.authService.getUserMailObservable(),
  favorites: this.favoritesService.getFavorites(),
  searchResults: this.searchService.getResults()
});

// Emits: 
// { isLoggedIn: true, userEmail: '[email protected]', favorites: [...], searchResults: [...] }
<div *ngIf="viewModel$ | async as vm">
  <div *ngIf="vm.isLoggedIn">
    <p>Welcome, {{ vm.userEmail }}</p>
    <app-favorites [favorites]="vm.favorites"></app-favorites>
  </div>
  <app-search-results [results]="vm.searchResults"></app-search-results>
</div>

switchMap()

Cancels previous inner observable when a new value arrives:
import { switchMap } from 'rxjs/operators';

// Search as user types - cancel previous search if new input arrives
searchControl.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => this.omdbService.search(query))  // Cancel previous search
).subscribe(results => {
  this.searchResults = results;
});
Use switchMap() for requests that should cancel previous requests (like search-as-you-type). Use mergeMap() if all requests should complete (like logging).

forkJoin()

Waits for all observables to complete, then emits combined result:
import { forkJoin } from 'rxjs';

// Fetch multiple resources in parallel
forkJoin({
  user: this.userService.getUser(),
  favorites: this.favoritesService.getFavorites(),
  recommendations: this.recommendationService.getRecommendations()
}).subscribe(({ user, favorites, recommendations }) => {
  // All three requests completed
  this.initializeDashboard(user, favorites, recommendations);
});

Reactive Form Patterns

ScreenPulse uses reactive forms with observable value streams:
import { FormBuilder, FormGroup, Validators } from '@angular/forms';
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';

export class SearchBarComponent implements OnInit {
  searchForm: FormGroup;
  
  constructor(private fb: FormBuilder) {
    this.searchForm = this.fb.group({
      title: ['', Validators.required],
      type: ['all'],
      year: ['']
    });
  }
  
  ngOnInit() {
    // React to title changes with debouncing
    this.searchForm.get('title')?.valueChanges.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      tap(title => console.log('Search for:', title))
    ).subscribe();
  }
  
  onSubmit() {
    if (this.searchForm.valid) {
      const filters = this.searchForm.value;
      this.searchSubmitted.emit(filters);
    }
  }
}

Error Handling Patterns

Global Error Interceptor

Handle errors at the HTTP layer:
src/app/core/interceptors/error.interceptor.ts
import { Injectable } from '@angular/core';
import { HttpInterceptor, HttpRequest, HttpHandler, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { ToastrService } from 'ngx-toastr';

@Injectable()
export class ErrorInterceptor implements HttpInterceptor {
  constructor(private toastr: ToastrService) {}

  intercept(req: HttpRequest<any>, next: HttpHandler): Observable<any> {
    return next.handle(req).pipe(
      catchError((error: HttpErrorResponse) => {
        let errorMessage = 'An error occurred';
        
        if (error.error instanceof ErrorEvent) {
          // Client-side error
          errorMessage = error.error.message;
        } else {
          // Server-side error
          errorMessage = `Error ${error.status}: ${error.message}`;
        }
        
        this.toastr.error(errorMessage);
        return throwError(() => error);
      })
    );
  }
}

Component-Level Error Handling

Handle specific errors in components:
this.omdbService.fetchMediaItems(title, type, year, page)
  .subscribe({
    next: (response) => {
      if (response.Response === "True") {
        this.searchState.collection = response.Search || [];
        this.searchState.collectionSize = Number(response.totalResults) || 0;
      } else {
        // API returned error
        this.toastrService.warning(response.Error, 'Try again!');
        this.searchState.collection = [];
      }
    },
    error: (error) => {
      // HTTP or network error
      this.toastrService.error(error.message);
      this.searchState.collection = [];
    },
    complete: () => {
      this.searchState.searchOnProcess = false;
    }
  });

Memory Leak Prevention

export class NavbarComponent {
  // Observable exposed to template
  userData$ = combineLatest({
    email: this.authService.getUserMailObservable(),
    isLoggedIn: this.authService.isLoggedInObservable()
  });
}
<!-- Async pipe handles subscription/unsubscription -->
<div *ngIf="userData$ | async as userData">
  {{ userData.email }}
</div>

Using take() for One-Time Operations

import { take } from 'rxjs/operators';

checkAuth() {
  this.authService.isLoggedInObservable()
    .pipe(take(1))  // Automatically completes after first emission
    .subscribe(loggedIn => {
      // Handle authentication status
    });
}

Using takeUntil() for Multiple Subscriptions

import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

export class MyComponent implements OnDestroy {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    // All subscriptions use takeUntil(this.destroy$)
    this.service.getData()
      .pipe(takeUntil(this.destroy$))
      .subscribe(data => this.data = data);
    
    this.service.getMoreData()
      .pipe(takeUntil(this.destroy$))
      .subscribe(data => this.moreData = data);
  }
  
  ngOnDestroy() {
    this.destroy$.next();    // Emit to complete all subscriptions
    this.destroy$.complete();
  }
}

Best Practices Summary

1

Prefer async pipe

Use async pipe in templates whenever possible for automatic subscription management
2

Chain operators with pipe()

Transform data with operators instead of nested subscriptions
3

Use switchMap for dependent calls

Chain API calls that depend on previous results
4

Always handle errors

Use catchError at appropriate levels (global interceptor + component handlers)
5

Clean up subscriptions

Use async pipe, take(), or takeUntil() to prevent memory leaks
6

Expose observables, not subjects

Services should return observables to prevent external modifications

Common Anti-Patterns

Nested Subscriptions
// BAD - Nested subscriptions
this.service1.getData().subscribe(data1 => {
  this.service2.getData(data1).subscribe(data2 => {
    this.service3.getData(data2).subscribe(data3 => {
      // Do something
    });
  });
});

// GOOD - Use switchMap
this.service1.getData().pipe(
  switchMap(data1 => this.service2.getData(data1)),
  switchMap(data2 => this.service3.getData(data2))
).subscribe(data3 => {
  // Do something
});
Manual Subscription Without Cleanup
// BAD - Memory leak
ngOnInit() {
  this.service.getData().subscribe(data => this.data = data);
}

// GOOD - Use async pipe or takeUntil
data$ = this.service.getData();

// Or with takeUntil
private destroy$ = new Subject<void>();
ngOnInit() {
  this.service.getData()
    .pipe(takeUntil(this.destroy$))
    .subscribe(data => this.data = data);
}
ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

Next Steps

RxJS Patterns

Learn foundational RxJS concepts

Services

Understand service architecture

HTTP Interceptors

Transform requests globally

State Management

Advanced state management strategies

Build docs developers (and LLMs) love