A Scheduler controls when a subscription starts and when notifications are delivered. It determines the execution context for Observable operations.
What is a Scheduler?
A Scheduler has three key responsibilities:
- Data Structure - Stores and queues tasks based on priority
- Execution Context - Defines where and when tasks execute (e.g., immediately, setTimeout, requestAnimationFrame)
- Virtual Clock - Provides a notion of “time” via the
now() method
Think of a Scheduler as a traffic controller that decides when and where Observable notifications should be delivered.
Type Signature
interface SchedulerLike extends TimestampProvider {
now(): number;
schedule<T>(
work: (this: SchedulerAction<T>, state: T) => void,
delay: number,
state: T
): Subscription;
}
interface SchedulerAction<T> extends Subscription {
schedule(state?: T, delay?: number): Subscription;
}
interface TimestampProvider {
now(): number;
}
How Schedulers Work
Schedulers control the execution timing of Observables:
Without Scheduler (Sync)
With asyncScheduler
import { Observable } from 'rxjs';
const observable = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
complete: () => console.log('done')
});
console.log('after subscribe');
// Output:
// before subscribe
// got value 1
// got value 2
// got value 3
// done
// after subscribe
import { Observable, asyncScheduler, observeOn } from 'rxjs';
const observable = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
complete: () => console.log('done')
});
console.log('after subscribe');
// Output:
// before subscribe
// after subscribe
// got value 1
// got value 2
// got value 3
// done
The asyncScheduler uses setTimeout, so even with delay: 0, it schedules work for the next event loop iteration.
Built-in Schedulers
asyncScheduler
Schedules work using setTimeout or setInterval. Use for time-based operations.
import { asyncScheduler } from 'rxjs';
// Schedule work to run after delay
const subscription = asyncScheduler.schedule(
function(state) {
console.log('State:', state);
// Reschedule with incremented state
if (state < 3) {
this.schedule(state + 1, 1000); // Repeat after 1 second
}
},
1000, // Initial delay
0 // Initial state
);
// Cancel after 4 seconds
setTimeout(() => subscription.unsubscribe(), 4000);
Common uses:
interval
timer
delay
timeout
asapScheduler
Schedules on the microtask queue (same as Promise.then). Executes after current synchronous code but before next event loop iteration.
import { asapScheduler } from 'rxjs';
console.log('start');
asapScheduler.schedule(() => {
console.log('asap');
});
Promise.resolve().then(() => {
console.log('promise');
});
console.log('end');
// Output:
// start
// end
// asap
// promise
// (or promise, asap - order not guaranteed within microtasks)
Common uses:
- Asynchronous conversions
- Promise-like behavior
- After current job, before next job
queueScheduler
Schedules on a queue in the current event frame (trampolining). Prevents stack overflow for recursive operations.
import { queueScheduler } from 'rxjs';
let count = 0;
queueScheduler.schedule(function(state) {
count++;
if (count < 5) {
console.log('Iteration:', count);
this.schedule(state); // Recursive scheduling
}
});
console.log('Final count:', count);
// Output:
// Iteration: 1
// Iteration: 2
// Iteration: 3
// Iteration: 4
// Final count: 5
Common uses:
- Iteration operations
- Preventing stack overflow
- Recursive operations
animationFrameScheduler
Schedules work before the next browser repaint. Perfect for animations.
import { animationFrameScheduler } from 'rxjs';
const element = document.getElementById('box');
let position = 0;
const subscription = animationFrameScheduler.schedule(function animate() {
position += 2;
element.style.left = position + 'px';
if (position < 500) {
this.schedule(); // Schedule next frame
}
});
// Cancel animation
// subscription.unsubscribe();
Common uses:
- Smooth browser animations
- Visual updates
- Game loops
Scheduler Comparison
| Scheduler | Execution Context | Use Case |
|---|
null (default) | Synchronous, immediate | Constant-time operations |
queueScheduler | Queue in current event frame | Iteration, recursive operations |
asapScheduler | Microtask queue | Asynchronous conversions, promises |
asyncScheduler | Macro task (setTimeout) | Time-based operations |
animationFrameScheduler | Before browser repaint | Animations, visual updates |
Using Schedulers with Operators
observeOn
Control when notifications are delivered to observers:
import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
console.log('before');
of(1, 2, 3)
.pipe(observeOn(asyncScheduler))
.subscribe(x => console.log(x));
console.log('after');
// Output:
// before
// after
// 1
// 2
// 3
subscribeOn
Control when the subscription happens:
import { of, asyncScheduler } from 'rxjs';
import { subscribeOn } from 'rxjs/operators';
console.log('before');
of(1, 2, 3)
.pipe(subscribeOn(asyncScheduler))
.subscribe(x => console.log(x));
console.log('after');
// Output:
// before
// after
// 1
// 2
// 3
subscribeOn only affects when the subscription is initialized. Use observeOn to control delivery of notifications.
Operators with Scheduler Parameters
Many operators accept an optional scheduler parameter:
import { from, asyncScheduler } from 'rxjs';
from([1, 2, 3], asyncScheduler)
.subscribe(x => console.log(x));
// Delivers values asynchronously
Practical Examples
Smooth Animation
import { animationFrameScheduler, interval } from 'rxjs';
import { map, takeWhile } from 'rxjs/operators';
const ball = document.getElementById('ball');
const duration = 2000; // 2 seconds
const startTime = Date.now();
interval(0, animationFrameScheduler)
.pipe(
map(() => (Date.now() - startTime) / duration),
takeWhile(progress => progress < 1),
map(progress => progress * 500) // Move 500px
)
.subscribe({
next: position => {
ball.style.left = position + 'px';
},
complete: () => {
ball.style.left = '500px'; // Final position
}
});
Batching Updates
import { Subject, asapScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const updates$ = new Subject<string>();
// Batch updates to next microtask
updates$
.pipe(observeOn(asapScheduler))
.subscribe(update => {
console.log('Processing:', update);
updateUI();
});
// Multiple synchronous emissions
updates$.next('update 1');
updates$.next('update 2');
updates$.next('update 3');
console.log('Queued updates');
// Output:
// Queued updates
// Processing: update 1
// Processing: update 2
// Processing: update 3
Preventing Stack Overflow
import { range, queueScheduler } from 'rxjs';
// Without scheduler - might cause stack overflow for large ranges
// range(1, 100000).subscribe(x => console.log(x));
// With queueScheduler - safe for any range
range(1, 100000, queueScheduler)
.subscribe(x => {
// Process each number
if (x % 10000 === 0) {
console.log('Processed:', x);
}
});
Time-Based Coordination
import { merge, asyncScheduler } from 'rxjs';
const task1 = asyncScheduler.schedule(
() => console.log('Task 1'),
1000
);
const task2 = asyncScheduler.schedule(
() => console.log('Task 2'),
500
);
const task3 = asyncScheduler.schedule(
() => console.log('Task 3'),
1500
);
// Output:
// Task 2 (at 500ms)
// Task 1 (at 1000ms)
// Task 3 (at 1500ms)
Virtual Time Testing
import { TestScheduler } from 'rxjs/testing';
import { delay } from 'rxjs/operators';
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
const source = cold('a-b-c|');
const expected = '---a-b-c|';
const result = source.pipe(delay(30));
expectObservable(result).toBe(expected);
});
// Virtual time allows instant testing of time-based operations
Best Practices
1. Use Default Scheduler When Possible
RxJS chooses sensible defaults. Only specify schedulers when you need specific timing behavior.
import { of } from 'rxjs';
// Default is fine for most cases
of(1, 2, 3).subscribe(x => console.log(x));
// Only specify when needed
import { asyncScheduler } from 'rxjs';
of(1, 2, 3, asyncScheduler).subscribe(x => console.log(x));
2. Use animationFrameScheduler for Animations
// ✗ BAD: Using setTimeout
setInterval(() => {
updateAnimation();
}, 16); // Approximate 60fps
// ✓ GOOD: Using animationFrameScheduler
import { interval, animationFrameScheduler } from 'rxjs';
interval(0, animationFrameScheduler)
.subscribe(() => updateAnimation());
3. Use queueScheduler for Recursion
import { queueScheduler } from 'rxjs';
function processArray(items: number[], index = 0): void {
if (index >= items.length) return;
console.log('Processing:', items[index]);
// Prevent stack overflow
queueScheduler.schedule(() => {
processArray(items, index + 1);
});
}
processArray(Array.from({ length: 10000 }, (_, i) => i));
4. Test with TestScheduler
import { TestScheduler } from 'rxjs/testing';
import { debounceTime } from 'rxjs/operators';
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
const input = 'a-b-c----|';
const expected = '----c----|';
const result = cold(input).pipe(
debounceTime(30, testScheduler)
);
expectObservable(result).toBe(expected);
});
Scheduler Selection Guide
| Need | Scheduler |
|---|
| Synchronous execution | null (default) |
| Time-based operations | asyncScheduler |
| Animations | animationFrameScheduler |
| Microtask queue | asapScheduler |
| Recursive operations | queueScheduler |
| Testing time | TestScheduler |
Common Pitfalls
1. Over-using Schedulers
// ✗ Unnecessary
import { of, asyncScheduler } from 'rxjs';
of(1, 2, 3, asyncScheduler).subscribe(x => console.log(x));
// ✓ Default is fine
of(1, 2, 3).subscribe(x => console.log(x));
2. Wrong Scheduler for Task
// ✗ BAD: asyncScheduler for animation
import { interval, asyncScheduler } from 'rxjs';
interval(16, asyncScheduler) // Choppy animation
.subscribe(() => animate());
// ✓ GOOD: animationFrameScheduler
import { interval, animationFrameScheduler } from 'rxjs';
interval(0, animationFrameScheduler) // Smooth animation
.subscribe(() => animate());
3. Forgetting to Unsubscribe
import { asyncScheduler } from 'rxjs';
// ✗ BAD: No cleanup
asyncScheduler.schedule(function repeat() {
console.log('tick');
this.schedule(undefined, 1000);
});
// ✓ GOOD: Save subscription for cleanup
const sub = asyncScheduler.schedule(function repeat() {
console.log('tick');
this.schedule(undefined, 1000);
});
setTimeout(() => sub.unsubscribe(), 5000);
When to Use Schedulers
Use Schedulers when you need:
- Specific timing control
- Animation frame synchronization
- Preventing stack overflow
- Testing time-based code
- Coordinating async operations
Don’t use Schedulers when:
- Default behavior is sufficient
- You’re not dealing with timing
- Simple synchronous operations
- Observable - Schedulers control Observable execution timing
- Operators - Many operators accept scheduler parameters
- Subscription - Schedulers return Subscriptions
- Testing - TestScheduler for virtual time testing