atomWithObservable creates an atom that subscribes to an Observable (RxJS or any observable that follows the Observable spec).
Signature
function atomWithObservable<Data>(
getObservable: (get: Getter) => ObservableLike<Data>,
options?: {
initialValue?: Data | (() => Data)
unstable_timeout?: number
}
): Atom<Data | Promise<Data>>
// For writable observables (Subjects)
function atomWithObservable<Data>(
getObservable: (get: Getter) => SubjectLike<Data>,
options?: {
initialValue?: Data | (() => Data)
unstable_timeout?: number
}
): WritableAtom<Data | Promise<Data>, [Data], void>
getObservable
(get: Getter) => ObservableLike<Data>
required
Function that returns an Observable. Has access to get to read other atoms
Initial value to use before the first emission. Without this, the atom value will be a Promise
Timeout in milliseconds to unsubscribe when the atom is not mounted
Dependencies
This utility requires an Observable library like RxJS:
Usage
Basic observable atom
import { atomWithObservable } from 'jotai/utils'
import { useAtom } from 'jotai'
import { interval } from 'rxjs'
import { map } from 'rxjs/operators'
const clockAtom = atomWithObservable(() =>
interval(1000).pipe(map(() => new Date()))
)
function Clock() {
const [time] = useAtom(clockAtom)
// time is a Promise initially, then becomes Date
return <div>{time instanceof Promise ? 'Loading...' : time.toLocaleTimeString()}</div>
}
With initial value
import { atomWithObservable } from 'jotai/utils'
import { useAtom } from 'jotai'
import { interval } from 'rxjs'
const counterAtom = atomWithObservable(
() => interval(1000),
{ initialValue: 0 }
)
function Counter() {
const [count] = useAtom(counterAtom)
// count is always a number, never a Promise
return <div>Count: {count}</div>
}
Writable atom with Subject
import { atomWithObservable } from 'jotai/utils'
import { useAtom } from 'jotai'
import { Subject } from 'rxjs'
import { scan } from 'rxjs/operators'
const subject = new Subject<number>()
const sumAtom = atomWithObservable(
() => subject.pipe(scan((acc, curr) => acc + curr, 0)),
{ initialValue: 0 }
)
function Sum() {
const [sum, addToSum] = useAtom(sumAtom)
return (
<div>
<div>Sum: {sum}</div>
<button onClick={() => addToSum(1)}>+1</button>
<button onClick={() => addToSum(5)}>+5</button>
<button onClick={() => addToSum(10)}>+10</button>
</div>
)
}
Reading other atoms
import { atomWithObservable } from 'jotai/utils'
import { atom, useAtom } from 'jotai'
import { interval } from 'rxjs'
import { map } from 'rxjs/operators'
const intervalAtom = atom(1000)
const counterAtom = atomWithObservable(
(get) => {
const ms = get(intervalAtom)
return interval(ms)
},
{ initialValue: 0 }
)
function Counter() {
const [intervalMs, setIntervalMs] = useAtom(intervalAtom)
const [count] = useAtom(counterAtom)
return (
<div>
<div>Count: {count}</div>
<div>Interval: {intervalMs}ms</div>
<button onClick={() => setIntervalMs(500)}>Faster</button>
<button onClick={() => setIntervalMs(2000)}>Slower</button>
</div>
)
}
WebSocket with observable
import { atomWithObservable } from 'jotai/utils'
import { useAtom } from 'jotai'
import { Observable } from 'rxjs'
interface Message {
type: string
data: unknown
}
const wsAtom = atomWithObservable<Message>(() =>
new Observable((subscriber) => {
const ws = new WebSocket('wss://example.com/socket')
ws.onmessage = (event) => {
subscriber.next(JSON.parse(event.data))
}
ws.onerror = (error) => {
subscriber.error(error)
}
ws.onclose = () => {
subscriber.complete()
}
return () => {
ws.close()
}
}),
{ initialValue: { type: 'connecting', data: null } }
)
function WebSocketComponent() {
const [message] = useAtom(wsAtom)
return (
<div>
<div>Type: {message.type}</div>
<div>Data: {JSON.stringify(message.data)}</div>
</div>
)
}
BehaviorSubject for reactive state
import { atomWithObservable } from 'jotai/utils'
import { useAtom } from 'jotai'
import { BehaviorSubject } from 'rxjs'
const subject = new BehaviorSubject({ name: 'John', age: 30 })
const userAtom = atomWithObservable(
() => subject,
{ initialValue: subject.value }
)
function UserProfile() {
const [user, setUser] = useAtom(userAtom)
return (
<div>
<div>Name: {user.name}</div>
<div>Age: {user.age}</div>
<button onClick={() => setUser({ ...user, age: user.age + 1 })}>
Increase age
</button>
</div>
)
}
Features
- Observable integration: Works with RxJS and any Observable implementation
- Automatic subscription: Subscribes when mounted, unsubscribes when unmounted
- Writable with Subjects: Atoms created from Subjects are writable
- Reactive: Re-subscribes when dependencies change
- Type-safe: Full TypeScript support
Notes
- Without
initialValue, the atom value will be a Promise until the first emission
- With
initialValue, the atom always has a non-Promise value
- The atom automatically manages subscription lifecycle
- For Subjects (observables with
next method), the atom becomes writable
- The
unstable_timeout option delays unsubscription when unmounted (useful for SSR)
- The observable is recreated when dependencies (atoms read via
get) change
Observable types
The utility supports any observable that implements:
interface ObservableLike<T> {
subscribe(observer: {
next: (value: T) => void
error: (error: unknown) => void
complete: () => void
}): { unsubscribe: () => void }
}
For writable atoms, it also needs:
interface SubjectLike<T> extends ObservableLike<T> {
next: (value: T) => void
}