fromEventPattern
Creates an Observable from an arbitrary API for registering event handlers.
Import
import { fromEventPattern } from 'rxjs' ;
Type Signature
function fromEventPattern < T >(
addHandler : ( handler : NodeEventHandler ) => any ,
removeHandler ?: ( handler : NodeEventHandler , signal ?: any ) => void ,
resultSelector ?: ( ... args : any []) => T
) : Observable < T >;
Parameters
A function that takes a handler and attaches it to the event source. Should return a token if the API provides one for removal.
A function that takes a handler (and optional token from addHandler) and removes it from the event source.
Optional function to transform the event arguments before emission.
Returns
An Observable that emits events from the custom event API. The handler is registered on subscription and removed on unsubscription.
Description
fromEventPattern is a more flexible version of fromEvent that works with any API for registering and unregistering event handlers. It’s useful when:
The event API doesn’t match standard patterns
You need to work with custom event systems
The API returns a token for unregistering
You want full control over handler registration
Key characteristics:
Maximum flexibility for custom event APIs
Handles APIs that return cancellation tokens
Can transform event arguments
Never completes (unless manually completed)
Examples
Basic Custom Event API
import { fromEventPattern } from 'rxjs' ;
function addClickHandler ( handler ) {
document . addEventListener ( 'click' , handler );
}
function removeClickHandler ( handler ) {
document . removeEventListener ( 'click' , handler );
}
const clicks$ = fromEventPattern (
addClickHandler ,
removeClickHandler
);
clicks$ . subscribe ( event => {
console . log ( 'Click detected:' , event );
});
API with Cancellation Token
import { fromEventPattern } from 'rxjs' ;
// Some APIs return a token/subscription object
const customAPI = {
on ( eventName , handler ) {
// Returns a token
return { id: Math . random (), handler };
},
off ( token ) {
// Uses token to unsubscribe
console . log ( 'Unsubscribing:' , token . id );
}
};
const events$ = fromEventPattern (
handler => customAPI . on ( 'message' , handler ),
( handler , token ) => customAPI . off ( token )
);
events$ . subscribe ( data => console . log ( 'Event:' , data ));
import { fromEventPattern } from 'rxjs' ;
// API that calls handler with multiple arguments
const customEmitter = {
subscribe ( handler ) {
this . handler = handler ;
},
emit ( type , message ) {
if ( this . handler ) {
this . handler ( type , message );
}
}
};
// Without result selector - emits array
const events1$ = fromEventPattern (
handler => customEmitter . subscribe ( handler )
);
events1$ . subscribe ( args => {
console . log ( 'Args:' , args ); // ['info', 'Hello']
});
// With result selector - emits custom object
const events2$ = fromEventPattern (
handler => customEmitter . subscribe ( handler ),
undefined ,
( type , message ) => ({ type , message })
);
events2$ . subscribe ( event => {
console . log ( 'Event:' , event ); // { type: 'info', message: 'Hello' }
});
customEmitter . emit ( 'info' , 'Hello' );
Common Use Cases
SignalR / Socket.IO
import { fromEventPattern } from 'rxjs' ;
import * as signalR from '@microsoft/signalr' ;
const connection = new signalR . HubConnectionBuilder ()
. withUrl ( '/chatHub' )
. build ();
function createMessage$ ( methodName : string ) {
return fromEventPattern (
handler => connection . on ( methodName , handler ),
handler => connection . off ( methodName , handler )
);
}
const messages$ = createMessage$ ( 'ReceiveMessage' );
messages$ . subscribe (([ user , message ]) => {
console . log ( ` ${ user } : ${ message } ` );
});
connection . start ();
Electron IPC
import { fromEventPattern } from 'rxjs' ;
import { ipcRenderer } from 'electron' ;
function fromIPC < T >( channel : string ) {
return fromEventPattern < T >(
handler => {
ipcRenderer . on ( channel , ( event , ... args ) => handler ( ... args ));
},
handler => {
ipcRenderer . removeListener ( channel , handler );
}
);
}
const updates$ = fromIPC < string >( 'update-available' );
updates$ . subscribe ( version => {
console . log ( 'Update available:' , version );
});
Firebase Realtime Database
import { fromEventPattern } from 'rxjs' ;
import { ref , onValue , off } from 'firebase/database' ;
import { map } from 'rxjs/operators' ;
function watchFirebaseValue ( path : string ) {
return fromEventPattern (
handler => {
const dbRef = ref ( database , path );
onValue ( dbRef , handler );
return dbRef ;
},
( handler , dbRef ) => {
off ( dbRef , 'value' , handler );
}
). pipe (
map ( snapshot => snapshot . val ())
);
}
const user$ = watchFirebaseValue ( '/users/123' );
user$ . subscribe ( userData => {
console . log ( 'User data updated:' , userData );
});
Geolocation Watching
import { fromEventPattern } from 'rxjs' ;
function watchPosition () {
return fromEventPattern (
handler => {
const watchId = navigator . geolocation . watchPosition (
position => handler ( position ),
error => handler ( error )
);
return watchId ;
},
( handler , watchId ) => {
navigator . geolocation . clearWatch ( watchId );
}
);
}
const position$ = watchPosition ();
const subscription = position$ . subscribe ( position => {
console . log ( 'Position:' , position . coords . latitude , position . coords . longitude );
});
// Stop watching when done
setTimeout (() => subscription . unsubscribe (), 10000 );
Custom Event Bus
import { fromEventPattern } from 'rxjs' ;
import { filter , map } from 'rxjs/operators' ;
class EventBus {
private handlers = new Map < string , Set < Function >>();
on ( event : string , handler : Function ) {
if ( ! this . handlers . has ( event )) {
this . handlers . set ( event , new Set ());
}
this . handlers . get ( event ) ! . add ( handler );
return { event , handler };
}
off ( token : { event : string ; handler : Function }) {
this . handlers . get ( token . event )?. delete ( token . handler );
}
emit ( event : string , data : any ) {
this . handlers . get ( event )?. forEach ( handler => handler ( data ));
}
}
const bus = new EventBus ();
function createEvent$ < T >( eventName : string ) {
return fromEventPattern < T >(
handler => bus . on ( eventName , handler ),
( handler , token ) => bus . off ( token )
);
}
const userLogin$ = createEvent$ <{ id : string ; name : string }>( 'user:login' );
userLogin$ . subscribe ( user => {
console . log ( 'User logged in:' , user . name );
});
bus . emit ( 'user:login' , { id: '123' , name: 'John' });
Mutation Observer
import { fromEventPattern } from 'rxjs' ;
function observeDOM ( element : Element , config : MutationObserverInit ) {
return fromEventPattern (
handler => {
const observer = new MutationObserver ( mutations => {
handler ( mutations );
});
observer . observe ( element , config );
return observer ;
},
( handler , observer ) => {
observer . disconnect ();
}
);
}
const element = document . querySelector ( '#dynamic-content' );
const mutations$ = observeDOM ( element , {
childList: true ,
subtree: true ,
attributes: true
});
mutations$ . subscribe ( mutations => {
console . log ( 'DOM changed:' , mutations . length , 'mutations' );
});
Advanced Examples
Resize Observer
import { fromEventPattern } from 'rxjs' ;
import { debounceTime } from 'rxjs/operators' ;
function watchElementSize ( element : Element ) {
return fromEventPattern < ResizeObserverEntry []>(
handler => {
const observer = new ResizeObserver ( entries => handler ( entries ));
observer . observe ( element );
return observer ;
},
( handler , observer ) => observer . disconnect ()
). pipe (
debounceTime ( 100 )
);
}
const container = document . querySelector ( '#container' );
watchElementSize ( container ). subscribe ( entries => {
const { width , height } = entries [ 0 ]. contentRect ;
console . log ( `Container size: ${ width } x ${ height } ` );
});
Intersection Observer
import { fromEventPattern } from 'rxjs' ;
import { filter } from 'rxjs/operators' ;
function watchIntersection ( element : Element , options ?: IntersectionObserverInit ) {
return fromEventPattern < IntersectionObserverEntry []>(
handler => {
const observer = new IntersectionObserver (
entries => handler ( entries ),
options
);
observer . observe ( element );
return observer ;
},
( handler , observer ) => observer . disconnect ()
);
}
const lazyImage = document . querySelector ( '#lazy-image' );
watchIntersection ( lazyImage , { threshold: 0.5 }). pipe (
filter ( entries => entries [ 0 ]. isIntersecting )
). subscribe (() => {
console . log ( 'Image is visible, loading...' );
loadImage ( lazyImage );
});
Comparison with fromEvent
fromEvent (Standard API)
fromEventPattern (Custom API)
import { fromEvent } from 'rxjs' ;
// Works with standard event targets
const clicks$ = fromEvent ( document , 'click' );
Important Notes
The addHandler function is called when you subscribe, and removeHandler is called when you unsubscribe. This ensures proper cleanup.
If your API returns a token/subscription object from the add function, return it from addHandler - it will be passed as the second argument to removeHandler.
Always provide a removeHandler to prevent memory leaks. Without it, event handlers will remain attached even after unsubscription.
See Also