Skip to main content
Effect Stream provides a more type-safe and composable alternative to RxJS Observables, with integrated error handling and resource management.

Key Differences

FeatureRxJSEffect Stream
Error handlingIn-band (error channel)Type-safe (error type parameter)
CompletionObservable completesStream ends naturally
Resourcesfinalize operatorEffect.scoped integration
ContextNo built-in DIBuilt-in dependency injection
BackpressureLimited supportFirst-class support

Basic Conversions

Creating Streams

import { of, from, interval } from "rxjs"

const simple$ = of(1, 2, 3)
const fromArray$ = from([1, 2, 3])
const timer$ = interval(1000)

Operators - Map and Filter

import { map, filter } from "rxjs/operators"

const result$ = source$.pipe(
  map(x => x * 2),
  filter(x => x > 10)
)

FlatMap / SwitchMap

import { switchMap, mergeMap } from "rxjs/operators"

const switched$ = source$.pipe(
  switchMap(id => fetchUser(id))
)

const merged$ = source$.pipe(
  mergeMap(id => fetchUser(id))
)
Effect Stream’s mapEffect gives you fine-grained concurrency control through options, rather than different operators.

CombineLatest

import { combineLatest } from "rxjs"

const combined$ = combineLatest([
  user$,
  settings$,
  notifications$
]).pipe(
  map(([user, settings, notifications]) => ({
    user,
    settings,
    notifications
  }))
)

Error Handling

import { catchError, retry } from "rxjs/operators"
import { of } from "rxjs"

const safe$ = source$.pipe(
  catchError(error => {
    console.error(error)
    return of(defaultValue)
  }),
  retry(3)
)
Unlike RxJS where errors terminate the stream, Effect Stream errors are typed and can be handled compositionally.

Debounce and Throttle

import { debounceTime, throttleTime } from "rxjs/operators"

const debounced$ = input$.pipe(
  debounceTime(300)
)

const throttled$ = input$.pipe(
  throttleTime(1000)
)

Take and Skip

import { take, skip, takeUntil, takeWhile } from "rxjs/operators"

const limited$ = source$.pipe(
  skip(5),
  take(10),
  takeWhile(x => x < 100)
)

Advanced Patterns

Subjects

import { Subject, BehaviorSubject } from "rxjs"

const subject = new Subject<number>()
subject.next(1)
subject.next(2)

const behavior = new BehaviorSubject<number>(0)
behavior.next(1)

Scan (Reduce over time)

import { scan } from "rxjs/operators"

const running$ = clicks$.pipe(
  scan((acc, click) => acc + 1, 0)
)

Merge and Concat

import { merge, concat } from "rxjs"

const merged$ = merge(stream1$, stream2$, stream3$)
const concatenated$ = concat(stream1$, stream2$, stream3$)

ShareReplay

import { shareReplay } from "rxjs/operators"

const shared$ = expensive$.pipe(
  shareReplay(1)
)
Effect’s broadcast returns a scoped resource. You must use Effect.scoped to manage the lifecycle properly.

Resource Management

import { using, defer } from "rxjs"
import { finalize } from "rxjs/operators"

const stream$ = defer(() => {
  const connection = createConnection()
  return from(connection.data$).pipe(
    finalize(() => connection.close())
  )
})

Running Streams

const subscription = stream$.subscribe({
  next: value => console.log(value),
  error: error => console.error(error),
  complete: () => console.log("Complete")
})

subscription.unsubscribe()

Common Stream Operations

Collect to Array

import { toArray } from "rxjs/operators"

const array$ = source$.pipe(toArray())

GroupBy

import { groupBy, mergeMap, toArray } from "rxjs/operators"

const grouped$ = source$.pipe(
  groupBy(item => item.category),
  mergeMap(group => group.pipe(toArray()))
)

Distinct

import { distinct, distinctUntilChanged } from "rxjs/operators"

const unique$ = source$.pipe(
  distinct()
)

const changed$ = source$.pipe(
  distinctUntilChanged()
)

Migration Strategy

  1. Start with simple streams: Convert basic Observables first
  2. Use Stream.fromAsyncIterable: Bridge from existing async iterables
  3. Leverage Effect integration: Streams work seamlessly with Effect
  4. Type your errors: Define error types for better type safety
  5. Consider backpressure: Effect Stream handles backpressure automatically
Effect Stream integrates with Effect’s ecosystem. You can use services, layers, and scoped resources directly in streams.

Testing

import { TestScheduler } from "rxjs/testing"

const scheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected)
})

scheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('a-b-c|')
  expectObservable(source$).toBe('a-b-c|')
})

Next Steps

Build docs developers (and LLMs) love