Skip to main content

fromEventPattern

Creates an Observable from an arbitrary API for registering event handlers.

Import

import { fromEventPattern } from 'rxjs';

Type Signature

function fromEventPattern<T>(
  addHandler: (handler: NodeEventHandler) => any,
  removeHandler?: (handler: NodeEventHandler, signal?: any) => void,
  resultSelector?: (...args: any[]) => T
): Observable<T>;

Parameters

addHandler
Function
required
A function that takes a handler and attaches it to the event source. Should return a token if the API provides one for removal.
removeHandler
Function
A function that takes a handler (and optional token from addHandler) and removes it from the event source.
resultSelector
Function
Optional function to transform the event arguments before emission.

Returns

Observable
Observable<T>
An Observable that emits events from the custom event API. The handler is registered on subscription and removed on unsubscription.

Description

fromEventPattern is a more flexible version of fromEvent that works with any API for registering and unregistering event handlers. It’s useful when:
  • The event API doesn’t match standard patterns
  • You need to work with custom event systems
  • The API returns a token for unregistering
  • You want full control over handler registration
Key characteristics:
  • Maximum flexibility for custom event APIs
  • Handles APIs that return cancellation tokens
  • Can transform event arguments
  • Never completes (unless manually completed)

Examples

Basic Custom Event API

import { fromEventPattern } from 'rxjs';

function addClickHandler(handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}

const clicks$ = fromEventPattern(
  addClickHandler,
  removeClickHandler
);

clicks$.subscribe(event => {
  console.log('Click detected:', event);
});

API with Cancellation Token

import { fromEventPattern } from 'rxjs';

// Some APIs return a token/subscription object
const customAPI = {
  on(eventName, handler) {
    // Returns a token
    return { id: Math.random(), handler };
  },
  off(token) {
    // Uses token to unsubscribe
    console.log('Unsubscribing:', token.id);
  }
};

const events$ = fromEventPattern(
  handler => customAPI.on('message', handler),
  (handler, token) => customAPI.off(token)
);

events$.subscribe(data => console.log('Event:', data));

Transform Multiple Arguments

import { fromEventPattern } from 'rxjs';

// API that calls handler with multiple arguments
const customEmitter = {
  subscribe(handler) {
    this.handler = handler;
  },
  emit(type, message) {
    if (this.handler) {
      this.handler(type, message);
    }
  }
};

// Without result selector - emits array
const events1$ = fromEventPattern(
  handler => customEmitter.subscribe(handler)
);

events1$.subscribe(args => {
  console.log('Args:', args); // ['info', 'Hello']
});

// With result selector - emits custom object
const events2$ = fromEventPattern(
  handler => customEmitter.subscribe(handler),
  undefined,
  (type, message) => ({ type, message })
);

events2$.subscribe(event => {
  console.log('Event:', event); // { type: 'info', message: 'Hello' }
});

customEmitter.emit('info', 'Hello');

Common Use Cases

SignalR / Socket.IO

import { fromEventPattern } from 'rxjs';
import * as signalR from '@microsoft/signalr';

const connection = new signalR.HubConnectionBuilder()
  .withUrl('/chatHub')
  .build();

function createMessage$(methodName: string) {
  return fromEventPattern(
    handler => connection.on(methodName, handler),
    handler => connection.off(methodName, handler)
  );
}

const messages$ = createMessage$('ReceiveMessage');

messages$.subscribe(([user, message]) => {
  console.log(`${user}: ${message}`);
});

connection.start();

Electron IPC

import { fromEventPattern } from 'rxjs';
import { ipcRenderer } from 'electron';

function fromIPC<T>(channel: string) {
  return fromEventPattern<T>(
    handler => {
      ipcRenderer.on(channel, (event, ...args) => handler(...args));
    },
    handler => {
      ipcRenderer.removeListener(channel, handler);
    }
  );
}

const updates$ = fromIPC<string>('update-available');

updates$.subscribe(version => {
  console.log('Update available:', version);
});

Firebase Realtime Database

import { fromEventPattern } from 'rxjs';
import { ref, onValue, off } from 'firebase/database';
import { map } from 'rxjs/operators';

function watchFirebaseValue(path: string) {
  return fromEventPattern(
    handler => {
      const dbRef = ref(database, path);
      onValue(dbRef, handler);
      return dbRef;
    },
    (handler, dbRef) => {
      off(dbRef, 'value', handler);
    }
  ).pipe(
    map(snapshot => snapshot.val())
  );
}

const user$ = watchFirebaseValue('/users/123');

user$.subscribe(userData => {
  console.log('User data updated:', userData);
});

Geolocation Watching

import { fromEventPattern } from 'rxjs';

function watchPosition() {
  return fromEventPattern(
    handler => {
      const watchId = navigator.geolocation.watchPosition(
        position => handler(position),
        error => handler(error)
      );
      return watchId;
    },
    (handler, watchId) => {
      navigator.geolocation.clearWatch(watchId);
    }
  );
}

const position$ = watchPosition();

const subscription = position$.subscribe(position => {
  console.log('Position:', position.coords.latitude, position.coords.longitude);
});

// Stop watching when done
setTimeout(() => subscription.unsubscribe(), 10000);

Custom Event Bus

import { fromEventPattern } from 'rxjs';
import { filter, map } from 'rxjs/operators';

class EventBus {
  private handlers = new Map<string, Set<Function>>();
  
  on(event: string, handler: Function) {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, new Set());
    }
    this.handlers.get(event)!.add(handler);
    return { event, handler };
  }
  
  off(token: { event: string; handler: Function }) {
    this.handlers.get(token.event)?.delete(token.handler);
  }
  
  emit(event: string, data: any) {
    this.handlers.get(event)?.forEach(handler => handler(data));
  }
}

const bus = new EventBus();

function createEvent$<T>(eventName: string) {
  return fromEventPattern<T>(
    handler => bus.on(eventName, handler),
    (handler, token) => bus.off(token)
  );
}

const userLogin$ = createEvent$<{ id: string; name: string }>('user:login');

userLogin$.subscribe(user => {
  console.log('User logged in:', user.name);
});

bus.emit('user:login', { id: '123', name: 'John' });

Mutation Observer

import { fromEventPattern } from 'rxjs';

function observeDOM(element: Element, config: MutationObserverInit) {
  return fromEventPattern(
    handler => {
      const observer = new MutationObserver(mutations => {
        handler(mutations);
      });
      observer.observe(element, config);
      return observer;
    },
    (handler, observer) => {
      observer.disconnect();
    }
  );
}

const element = document.querySelector('#dynamic-content');

const mutations$ = observeDOM(element, {
  childList: true,
  subtree: true,
  attributes: true
});

mutations$.subscribe(mutations => {
  console.log('DOM changed:', mutations.length, 'mutations');
});

Advanced Examples

Resize Observer

import { fromEventPattern } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

function watchElementSize(element: Element) {
  return fromEventPattern<ResizeObserverEntry[]>(
    handler => {
      const observer = new ResizeObserver(entries => handler(entries));
      observer.observe(element);
      return observer;
    },
    (handler, observer) => observer.disconnect()
  ).pipe(
    debounceTime(100)
  );
}

const container = document.querySelector('#container');

watchElementSize(container).subscribe(entries => {
  const { width, height } = entries[0].contentRect;
  console.log(`Container size: ${width}x${height}`);
});

Intersection Observer

import { fromEventPattern } from 'rxjs';
import { filter } from 'rxjs/operators';

function watchIntersection(element: Element, options?: IntersectionObserverInit) {
  return fromEventPattern<IntersectionObserverEntry[]>(
    handler => {
      const observer = new IntersectionObserver(
        entries => handler(entries),
        options
      );
      observer.observe(element);
      return observer;
    },
    (handler, observer) => observer.disconnect()
  );
}

const lazyImage = document.querySelector('#lazy-image');

watchIntersection(lazyImage, { threshold: 0.5 }).pipe(
  filter(entries => entries[0].isIntersecting)
).subscribe(() => {
  console.log('Image is visible, loading...');
  loadImage(lazyImage);
});

Comparison with fromEvent

import { fromEvent } from 'rxjs';

// Works with standard event targets
const clicks$ = fromEvent(document, 'click');

Important Notes

The addHandler function is called when you subscribe, and removeHandler is called when you unsubscribe. This ensures proper cleanup.
If your API returns a token/subscription object from the add function, return it from addHandler - it will be passed as the second argument to removeHandler.
Always provide a removeHandler to prevent memory leaks. Without it, event handlers will remain attached even after unsubscription.

See Also