Effect Stream provides a more type-safe and composable alternative to RxJS Observables, with integrated error handling and resource management.
Key Differences
Feature RxJS Effect Stream Error handling In-band (error channel) Type-safe (error type parameter) Completion Observable completes Stream ends naturally Resources finalize operatorEffect.scoped integrationContext No built-in DI Built-in dependency injection Backpressure Limited support First-class support
Basic Conversions
Creating Streams
Before (RxJS)
After (Effect Stream)
import { of , from , interval } from "rxjs"
const simple$ = of ( 1 , 2 , 3 )
const fromArray$ = from ([ 1 , 2 , 3 ])
const timer$ = interval ( 1000 )
Operators - Map and Filter
Before (RxJS)
After (Effect Stream)
import { map , filter } from "rxjs/operators"
const result$ = source$ . pipe (
map ( x => x * 2 ),
filter ( x => x > 10 )
)
FlatMap / SwitchMap
Before (RxJS)
After (Effect Stream)
import { switchMap , mergeMap } from "rxjs/operators"
const switched$ = source$ . pipe (
switchMap ( id => fetchUser ( id ))
)
const merged$ = source$ . pipe (
mergeMap ( id => fetchUser ( id ))
)
Effect Stream’s mapEffect gives you fine-grained concurrency control through options, rather than different operators.
CombineLatest
Before (RxJS)
After (Effect Stream)
import { combineLatest } from "rxjs"
const combined$ = combineLatest ([
user$ ,
settings$ ,
notifications$
]). pipe (
map (([ user , settings , notifications ]) => ({
user ,
settings ,
notifications
}))
)
Error Handling
Before (RxJS)
After (Effect Stream)
import { catchError , retry } from "rxjs/operators"
import { of } from "rxjs"
const safe$ = source$ . pipe (
catchError ( error => {
console . error ( error )
return of ( defaultValue )
}),
retry ( 3 )
)
Unlike RxJS where errors terminate the stream, Effect Stream errors are typed and can be handled compositionally.
Debounce and Throttle
Before (RxJS)
After (Effect Stream)
import { debounceTime , throttleTime } from "rxjs/operators"
const debounced$ = input$ . pipe (
debounceTime ( 300 )
)
const throttled$ = input$ . pipe (
throttleTime ( 1000 )
)
Take and Skip
Before (RxJS)
After (Effect Stream)
import { take , skip , takeUntil , takeWhile } from "rxjs/operators"
const limited$ = source$ . pipe (
skip ( 5 ),
take ( 10 ),
takeWhile ( x => x < 100 )
)
Advanced Patterns
Subjects
Before (RxJS)
After (Effect Stream)
import { Subject , BehaviorSubject } from "rxjs"
const subject = new Subject < number >()
subject . next ( 1 )
subject . next ( 2 )
const behavior = new BehaviorSubject < number >( 0 )
behavior . next ( 1 )
Scan (Reduce over time)
Before (RxJS)
After (Effect Stream)
import { scan } from "rxjs/operators"
const running$ = clicks$ . pipe (
scan (( acc , click ) => acc + 1 , 0 )
)
Merge and Concat
Before (RxJS)
After (Effect Stream)
import { merge , concat } from "rxjs"
const merged$ = merge ( stream1$ , stream2$ , stream3$ )
const concatenated$ = concat ( stream1$ , stream2$ , stream3$ )
ShareReplay
Before (RxJS)
After (Effect Stream)
import { shareReplay } from "rxjs/operators"
const shared$ = expensive$ . pipe (
shareReplay ( 1 )
)
Effect’s broadcast returns a scoped resource. You must use Effect.scoped to manage the lifecycle properly.
Resource Management
Before (RxJS)
After (Effect Stream)
import { using , defer } from "rxjs"
import { finalize } from "rxjs/operators"
const stream$ = defer (() => {
const connection = createConnection ()
return from ( connection . data$ ). pipe (
finalize (() => connection . close ())
)
})
Running Streams
Before (RxJS)
After (Effect Stream)
const subscription = stream$ . subscribe ({
next : value => console . log ( value ),
error : error => console . error ( error ),
complete : () => console . log ( "Complete" )
})
subscription . unsubscribe ()
Common Stream Operations
Collect to Array
Before (RxJS)
After (Effect Stream)
import { toArray } from "rxjs/operators"
const array$ = source$ . pipe ( toArray ())
GroupBy
Before (RxJS)
After (Effect Stream)
import { groupBy , mergeMap , toArray } from "rxjs/operators"
const grouped$ = source$ . pipe (
groupBy ( item => item . category ),
mergeMap ( group => group . pipe ( toArray ()))
)
Distinct
Before (RxJS)
After (Effect Stream)
import { distinct , distinctUntilChanged } from "rxjs/operators"
const unique$ = source$ . pipe (
distinct ()
)
const changed$ = source$ . pipe (
distinctUntilChanged ()
)
Migration Strategy
Start with simple streams : Convert basic Observables first
Use Stream.fromAsyncIterable : Bridge from existing async iterables
Leverage Effect integration : Streams work seamlessly with Effect
Type your errors : Define error types for better type safety
Consider backpressure : Effect Stream handles backpressure automatically
Effect Stream integrates with Effect’s ecosystem. You can use services, layers, and scoped resources directly in streams.
Testing
Before (RxJS)
After (Effect Stream)
import { TestScheduler } from "rxjs/testing"
const scheduler = new TestScheduler (( actual , expected ) => {
expect ( actual ). toEqual ( expected )
})
scheduler . run (({ cold , expectObservable }) => {
const source$ = cold ( 'a-b-c|' )
expectObservable ( source$ ). toBe ( 'a-b-c|' )
})
Next Steps