Skip to main content
atomWithObservable creates an atom that subscribes to an RxJS-compatible observable.

Import

import { atomWithObservable } from 'jotai/utils'

Signature

// With SubjectLike and initial value (synchronous)
function atomWithObservable<Data>(
  getObservable: (get: Getter) => SubjectLike<Data>,
  options: { initialValue: Data | (() => Data); unstable_timeout?: number },
): WritableAtom<Data, [Data], void>

// With SubjectLike (may return Promise)
function atomWithObservable<Data>(
  getObservable: (get: Getter) => SubjectLike<Data>,
  options?: { initialValue?: Data | (() => Data); unstable_timeout?: number },
): WritableAtom<Data | Promise<Data>, [Data], void>

// With ObservableLike and initial value (read-only, synchronous)
function atomWithObservable<Data>(
  getObservable: (get: Getter) => ObservableLike<Data>,
  options: { initialValue: Data | (() => Data); unstable_timeout?: number },
): Atom<Data>

// With ObservableLike (read-only, may return Promise)
function atomWithObservable<Data>(
  getObservable: (get: Getter) => ObservableLike<Data>,
  options?: { initialValue?: Data | (() => Data); unstable_timeout?: number },
): Atom<Data | Promise<Data>>

Parameters

getObservable
(get: Getter) => ObservableLike<Data> | SubjectLike<Data>
required
A function that returns an observable or subject. Use get to derive the observable from other atoms
options
object
Optional configuration object
options.initialValue
Data | (() => Data)
Initial value to use before the observable emits. If not provided, the atom will be async until first emission
options.unstable_timeout
number
Timeout in milliseconds to automatically unsubscribe when the atom is not mounted. Unstable API that may change

Return Value

Returns:
  • A writable atom if a SubjectLike is provided (can write values via next())
  • A read-only atom if an ObservableLike is provided
  • The atom value is synchronous if initialValue is provided, otherwise returns a Promise

Observable Types

type ObservableLike<T> = {
  subscribe(observer: Partial<Observer<T>>): Subscription
}

type SubjectLike<T> = ObservableLike<T> & Observer<T>

type Observer<T> = {
  next: (value: T) => void
  error: (error: any) => void
  complete: () => void
}

type Subscription = {
  unsubscribe: () => void
}

Usage Example

import { useAtom } from 'jotai'
import { atomWithObservable } from 'jotai/utils'
import { interval } from 'rxjs'
import { map } from 'rxjs/operators'

const clockAtom = atomWithObservable(() =>
  interval(1000).pipe(map(() => new Date().toLocaleTimeString()))
, { initialValue: new Date().toLocaleTimeString() })

function Clock() {
  const [time] = useAtom(clockAtom)
  return <div>Current time: {time}</div>
}

With Subject (Writable)

import { BehaviorSubject } from 'rxjs'
import { atomWithObservable } from 'jotai/utils'

const subject = new BehaviorSubject('initial')
const subjectAtom = atomWithObservable(() => subject)

function Component() {
  const [value, setValue] = useAtom(subjectAtom)

  return (
    <div>
      <p>Value: {value}</p>
      <button onClick={() => setValue('updated')}>Update</button>
    </div>
  )
}

Derived Observable

import { atom, useAtom } from 'jotai'
import { atomWithObservable } from 'jotai/utils'
import { switchMap } from 'rxjs/operators'
import { ajax } from 'rxjs/ajax'

const userIdAtom = atom(1)

const userAtom = atomWithObservable((get) => {
  const userId = get(userIdAtom)
  return ajax.getJSON(`/api/users/${userId}`)
})

function UserProfile() {
  const [userId, setUserId] = useAtom(userIdAtom)
  const [user] = useAtom(userAtom)

  return (
    <div>
      <select value={userId} onChange={(e) => setUserId(Number(e.target.value))}>
        <option value={1}>User 1</option>
        <option value={2}>User 2</option>
      </select>
      {user && <p>{user.name}</p>}
    </div>
  )
}

WebSocket Example

import { atomWithObservable } from 'jotai/utils'
import { webSocket } from 'rxjs/webSocket'

const wsAtom = atomWithObservable(() =>
  webSocket('ws://localhost:8080'),
  { initialValue: { type: 'connecting' } }
)

function WebSocketComponent() {
  const [message, sendMessage] = useAtom(wsAtom)

  return (
    <div>
      <p>Message: {JSON.stringify(message)}</p>
      <button onClick={() => sendMessage({ type: 'ping' })}>
        Send Ping
      </button>
    </div>
  )
}

Error Handling

import { atomWithObservable } from 'jotai/utils'
import { of, throwError } from 'rxjs'
import { catchError, map } from 'rxjs/operators'

const dataAtom = atomWithObservable(() =>
  ajax.getJSON('/api/data').pipe(
    catchError((error) => of({ error: error.message }))
  ),
  { initialValue: { loading: true } }
)

function DataComponent() {
  const [data] = useAtom(dataAtom)

  if ('loading' in data) return <div>Loading...</div>
  if ('error' in data) return <div>Error: {data.error}</div>
  return <div>Data: {JSON.stringify(data)}</div>
}

Notes

  • The observable is subscribed to when the atom is mounted
  • If unstable_timeout is set, unsubscribes after the timeout when unmounted
  • Supports the Symbol.observable property for TC39 Observable interop
  • Errors from the observable are thrown when reading the atom
  • Writing to a non-subject observable will throw an error
  • The observable is recreated when dependencies (from get) change

Build docs developers (and LLMs) love