Skip to main content
ScreenPulse uses RxJS (Reactive Extensions for JavaScript) to manage asynchronous data streams throughout the application. RxJS provides a powerful way to handle events, HTTP requests, and state changes using observable patterns.

What is RxJS?

RxJS is a library for reactive programming using Observables, which represent streams of data that can emit multiple values over time.
Think of an Observable as a “lazy Promise” that can emit multiple values. While a Promise resolves once with a single value, an Observable can emit many values over its lifetime.

BehaviorSubject for State Management

ScreenPulse uses BehaviorSubject to manage application state, particularly for authentication:
src/app/core/services/auth.service.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { AuthUser } from 'src/app/shared/models/auth.model';

@Injectable({
  providedIn: 'root'
})
export class AuthService {
  private readonly authTokenKey = 'authToken';
  private readonly userMailKey = 'userMail';

  // BehaviorSubjects hold current state and emit to new subscribers
  private userMailSubject = new BehaviorSubject<string | null>(null);
  private userLoggedInSubject = new BehaviorSubject<boolean>(false);

  constructor() {
    // Initialize state from sessionStorage
    this.userMailSubject.next(sessionStorage.getItem(this.userMailKey));
    this.userLoggedInSubject.next(
      sessionStorage.getItem(this.authTokenKey) !== null
    );
  }

  // Expose Observables (not Subjects) to prevent external modifications
  isLoggedInObservable(): Observable<boolean> {
    return this.userLoggedInSubject.asObservable();
  }

  getUserMailObservable(): Observable<string | null> {
    return this.userMailSubject.asObservable();
  }

  // Update state by calling next()
  setAuthToken(token: string) {
    sessionStorage.setItem(this.authTokenKey, token);
    this.userLoggedInSubject.next(true);
  }

  setUserMail(userMail: string) {
    sessionStorage.setItem(this.userMailKey, userMail);
    this.userMailSubject.next(userMail);
  }

  logOut() {
    sessionStorage.removeItem(this.authTokenKey);
    sessionStorage.removeItem(this.userMailKey);
    this.userMailSubject.next(null);
    this.userLoggedInSubject.next(false);
  }
}

Why BehaviorSubject?

BehaviorSubject always has a current value. New subscribers immediately receive the latest value:
const subject = new BehaviorSubject(false);
subject.next(true);

// New subscriber gets current value (true) immediately
subject.subscribe(value => console.log(value)); // Logs: true
Multiple components can subscribe to the same BehaviorSubject and receive updates:
// Navbar component
this.authService.isLoggedInObservable().subscribe(loggedIn => {
  this.showLoginButton = !loggedIn;
});

// Auth guard
this.authService.isLoggedInObservable().subscribe(loggedIn => {
  if (!loggedIn) this.router.navigate(['/auth/login']);
});
By exposing Observables instead of Subjects, we prevent external code from calling next():
// Good: Only the service can modify state
isLoggedInObservable(): Observable<boolean> {
  return this.userLoggedInSubject.asObservable();
}

// Bad: External code could call next() and corrupt state
// isLoggedInSubject: BehaviorSubject<boolean>;

Observable Streams in Components

Components subscribe to observables to react to data changes:
src/app/layout/navbar/navbar.component.ts
import { Component } from '@angular/core';
import { Router } from '@angular/router';
import { AuthService } from 'src/app/core/services/auth.service';
import { combineLatest, Observable } from 'rxjs';
import { UserSessionData } from 'src/app/shared/models/user-session.model';

@Component({
  selector: 'app-navbar',
  templateUrl: './navbar.component.html',
  styleUrls: ['./navbar.component.scss'],
  changeDetection: ChangeDetectionStrategy.OnPush
})
export class NavbarComponent {
  // Combine multiple observables into a single stream
  readonly userData$: Observable<UserSessionData> = combineLatest({
    email: this.authService.getUserMailObservable(),
    isLoggedIn: this.authService.isLoggedInObservable() 
  });

  constructor(
    private authService: AuthService,
    private router: Router
  ) { }

  logOut() {
    this.authService.logOut();
    this.router.navigate(['']);
  }
}
navbar.component.html
<!-- Use async pipe to subscribe in template -->
<ng-container *ngIf="userData$ | async as userData">
  <div *ngIf="userData.isLoggedIn">
    <span>{{ userData.email }}</span>
    <button (click)="logOut()">Log Out</button>
  </div>
  <div *ngIf="!userData.isLoggedIn">
    <a routerLink="/auth/login">Login</a>
  </div>
</ng-container>
The $ suffix is a naming convention to indicate that a variable is an Observable. It’s not required, but it’s widely adopted in the Angular community.

RxJS Operators

Operators transform, filter, and combine observable streams. ScreenPulse uses several common operators:

take()

Completes the observable after emitting a specified number of values:
src/app/core/guards/auth.guard.ts
import { take, tap } from 'rxjs/operators';

canActivate(): Observable<boolean> {
  return this.authService.isLoggedInObservable().pipe(
    take(1),  // Complete after first emission
    tap(loggedIn => {
      if (!loggedIn) {
        this.toastService.warning(
          'You must be logged in to access this page.',
          'Access Denied'
        );
        this.router.navigate(['/auth/login']);
      }
    })
  );
}
take(1) is crucial in guards to prevent memory leaks. Without it, the subscription would never complete.

switchMap()

Cancels the previous inner observable when a new value arrives:
src/app/pages/search/page/search.component.ts
import { switchMap, take } from 'rxjs/operators';
import { EMPTY } from 'rxjs';

addToFavorites(mediaItem: MediaItem) {
  this.authService.isLoggedInObservable().pipe(
    take(1),
    switchMap(loggedIn => {
      if (!loggedIn) {
        this.toastrService.warning(
          'You must be logged in to add movies to your list',
          'Error'
        );
        this.router.navigate(['/auth/login']);
        return EMPTY;  // Return empty observable - no further processing
      }
      // Switch to the favorites service observable
      return this.favoritesService.addToFavorites(mediaItem);
    })
  ).subscribe({
    next: () => this.toastrService.success(mediaItem.title, 'Added to favorites'),
    error: (error) => this.toastrService.warning(error.message)
  });
}

tap()

Performs side effects without modifying the stream:
this.http.get('/api/movies').pipe(
  tap(movies => console.log('Received movies:', movies)),
  tap(() => this.loadingService.hide())
).subscribe();

finalize()

Executes code when the observable completes or errors:
src/app/pages/search/page/search.component.ts
import { finalize } from 'rxjs/operators';

openMediaItem(mediaItem: MediaItem): void {
  this.loadingCard = true;
  this.dialogService
    .openMediaItem(window.innerWidth, mediaItem, false)
    .pipe(
      finalize(() => (this.loadingCard = false))  // Always hide loader
    )
    .subscribe();
}

combineLatest()

Combines multiple observables and emits when any source emits:
src/app/layout/navbar/navbar.component.ts
import { combineLatest } from 'rxjs';

readonly userData$ = combineLatest({
  email: this.authService.getUserMailObservable(),
  isLoggedIn: this.authService.isLoggedInObservable()
});

// Emits: { email: '[email protected]', isLoggedIn: true }

Subscription Management

Unmanaged subscriptions can cause memory leaks. ScreenPulse uses several strategies to prevent this: The async pipe automatically subscribes and unsubscribes:
export class NavbarComponent {
  userData$ = combineLatest({
    email: this.authService.getUserMailObservable(),
    isLoggedIn: this.authService.isLoggedInObservable()
  });
}
<div *ngIf="userData$ | async as userData">
  {{ userData.email }}
</div>
Use the async pipe whenever possible. It’s the cleanest way to handle subscriptions and prevent memory leaks.

Strategy 2: take() Operator

For one-time operations, use take(1) to auto-complete:
this.authService.isLoggedInObservable()
  .pipe(take(1))
  .subscribe(loggedIn => {
    // This subscription completes automatically
  });

Strategy 3: Manual Unsubscribe

For complex scenarios, store and unsubscribe manually:
import { Subscription } from 'rxjs';

export class MyComponent implements OnDestroy {
  private subscription: Subscription;
  
  ngOnInit() {
    this.subscription = this.service.getData().subscribe(data => {
      // Handle data
    });
  }
  
  ngOnDestroy() {
    this.subscription?.unsubscribe();
  }
}

Strategy 4: takeUntil()

Use a Subject to signal when to complete all subscriptions:
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

export class MyComponent implements OnDestroy {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    this.service.getData()
      .pipe(takeUntil(this.destroy$))
      .subscribe(data => {
        // Handle data
      });
    
    this.service.getMoreData()
      .pipe(takeUntil(this.destroy$))
      .subscribe(data => {
        // Handle more data
      });
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

HTTP with RxJS

Angular’s HttpClient returns observables for HTTP operations:
src/app/core/services/user.service.ts
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { LoginResponse, User } from 'src/app/shared/models/auth.model';
import { environment } from 'src/environments/environment.development';

@Injectable({
  providedIn: 'root'
})
export class UserService {
  private baseUrl = environment.serverUserURL;

  constructor(private http: HttpClient) { }

  login(formData: User): Observable<LoginResponse> {
    const httpOptions = {
      headers: new HttpHeaders({
        'Content-Type': 'application/json',
      }),
    };
    return this.http.post<LoginResponse>(
      `${this.baseUrl}/login`,
      formData,
      httpOptions
    );
  }

  register(formData: User): Observable<User> {
    const httpOptions = {
      headers: new HttpHeaders({
        'Content-Type': 'application/json',
      }),
    };
    return this.http.post<User>(
      `${this.baseUrl}/register`,
      formData,
      httpOptions
    );
  }
}

Common RxJS Patterns

isLoading$ = new BehaviorSubject<boolean>(false);

fetchData() {
  this.isLoading$.next(true);
  this.http.get('/api/data')
    .pipe(finalize(() => this.isLoading$.next(false)))
    .subscribe();
}
import { catchError } from 'rxjs/operators';
import { of } from 'rxjs';

this.http.get('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]);  // Return empty array as fallback
  })
).subscribe();
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';

searchControl.valueChanges.pipe(
  debounceTime(300),  // Wait 300ms after user stops typing
  distinctUntilChanged(),  // Only emit if value changed
  switchMap(query => this.searchService.search(query))
).subscribe();
import { retry } from 'rxjs/operators';

this.http.get('/api/data').pipe(
  retry(3)  // Retry up to 3 times on failure
).subscribe();

Best Practices

1

Use async pipe in templates

Let Angular handle subscription lifecycle automatically
2

Expose Observables, not Subjects

Prevent external code from calling next() on your Subjects
3

Always unsubscribe

Use async pipe, take(1), takeUntil(), or manual unsubscribe in ngOnDestroy
4

Use operators to transform data

Chain operators with pipe() instead of nested subscriptions
5

Handle errors gracefully

Use catchError to provide fallback values and prevent stream termination

Next Steps

Services

Learn about service architecture and dependency injection

Reactive Patterns

Advanced reactive programming patterns

HTTP Interceptors

Intercept and transform HTTP requests

State Management

Alternative state management approaches

Build docs developers (and LLMs) love