atomWithObservable creates an atom that subscribes to an RxJS-compatible observable.
Import
import { atomWithObservable } from 'jotai/utils'
Signature
// With SubjectLike and initial value (synchronous)
function atomWithObservable<Data>(
getObservable: (get: Getter) => SubjectLike<Data>,
options: { initialValue: Data | (() => Data); unstable_timeout?: number },
): WritableAtom<Data, [Data], void>
// With SubjectLike (may return Promise)
function atomWithObservable<Data>(
getObservable: (get: Getter) => SubjectLike<Data>,
options?: { initialValue?: Data | (() => Data); unstable_timeout?: number },
): WritableAtom<Data | Promise<Data>, [Data], void>
// With ObservableLike and initial value (read-only, synchronous)
function atomWithObservable<Data>(
getObservable: (get: Getter) => ObservableLike<Data>,
options: { initialValue: Data | (() => Data); unstable_timeout?: number },
): Atom<Data>
// With ObservableLike (read-only, may return Promise)
function atomWithObservable<Data>(
getObservable: (get: Getter) => ObservableLike<Data>,
options?: { initialValue?: Data | (() => Data); unstable_timeout?: number },
): Atom<Data | Promise<Data>>
Parameters
getObservable
(get: Getter) => ObservableLike<Data> | SubjectLike<Data>
required
A function that returns an observable or subject. Use get to derive the observable from other atoms
Optional configuration objectInitial value to use before the observable emits. If not provided, the atom will be async until first emission
Timeout in milliseconds to automatically unsubscribe when the atom is not mounted. Unstable API that may change
Return Value
Returns:
- A writable atom if a
SubjectLike is provided (can write values via next())
- A read-only atom if an
ObservableLike is provided
- The atom value is synchronous if
initialValue is provided, otherwise returns a Promise
Observable Types
type ObservableLike<T> = {
subscribe(observer: Partial<Observer<T>>): Subscription
}
type SubjectLike<T> = ObservableLike<T> & Observer<T>
type Observer<T> = {
next: (value: T) => void
error: (error: any) => void
complete: () => void
}
type Subscription = {
unsubscribe: () => void
}
Usage Example
import { useAtom } from 'jotai'
import { atomWithObservable } from 'jotai/utils'
import { interval } from 'rxjs'
import { map } from 'rxjs/operators'
const clockAtom = atomWithObservable(() =>
interval(1000).pipe(map(() => new Date().toLocaleTimeString()))
, { initialValue: new Date().toLocaleTimeString() })
function Clock() {
const [time] = useAtom(clockAtom)
return <div>Current time: {time}</div>
}
With Subject (Writable)
import { BehaviorSubject } from 'rxjs'
import { atomWithObservable } from 'jotai/utils'
const subject = new BehaviorSubject('initial')
const subjectAtom = atomWithObservable(() => subject)
function Component() {
const [value, setValue] = useAtom(subjectAtom)
return (
<div>
<p>Value: {value}</p>
<button onClick={() => setValue('updated')}>Update</button>
</div>
)
}
Derived Observable
import { atom, useAtom } from 'jotai'
import { atomWithObservable } from 'jotai/utils'
import { switchMap } from 'rxjs/operators'
import { ajax } from 'rxjs/ajax'
const userIdAtom = atom(1)
const userAtom = atomWithObservable((get) => {
const userId = get(userIdAtom)
return ajax.getJSON(`/api/users/${userId}`)
})
function UserProfile() {
const [userId, setUserId] = useAtom(userIdAtom)
const [user] = useAtom(userAtom)
return (
<div>
<select value={userId} onChange={(e) => setUserId(Number(e.target.value))}>
<option value={1}>User 1</option>
<option value={2}>User 2</option>
</select>
{user && <p>{user.name}</p>}
</div>
)
}
WebSocket Example
import { atomWithObservable } from 'jotai/utils'
import { webSocket } from 'rxjs/webSocket'
const wsAtom = atomWithObservable(() =>
webSocket('ws://localhost:8080'),
{ initialValue: { type: 'connecting' } }
)
function WebSocketComponent() {
const [message, sendMessage] = useAtom(wsAtom)
return (
<div>
<p>Message: {JSON.stringify(message)}</p>
<button onClick={() => sendMessage({ type: 'ping' })}>
Send Ping
</button>
</div>
)
}
Error Handling
import { atomWithObservable } from 'jotai/utils'
import { of, throwError } from 'rxjs'
import { catchError, map } from 'rxjs/operators'
const dataAtom = atomWithObservable(() =>
ajax.getJSON('/api/data').pipe(
catchError((error) => of({ error: error.message }))
),
{ initialValue: { loading: true } }
)
function DataComponent() {
const [data] = useAtom(dataAtom)
if ('loading' in data) return <div>Loading...</div>
if ('error' in data) return <div>Error: {data.error}</div>
return <div>Data: {JSON.stringify(data)}</div>
}
Notes
- The observable is subscribed to when the atom is mounted
- If
unstable_timeout is set, unsubscribes after the timeout when unmounted
- Supports the
Symbol.observable property for TC39 Observable interop
- Errors from the observable are thrown when reading the atom
- Writing to a non-subject observable will throw an error
- The observable is recreated when dependencies (from
get) change