Skip to main content
A SubscriptionRef<A> is a SynchronizedRef that can be subscribed to in order to receive the current value as well as all changes to that value. It combines mutable state management with reactive streaming.

Overview

SubscriptionRef is ideal for reactive state that multiple consumers need to observe. It provides:
  • Mutable state with atomic updates
  • Change notification via streams
  • Current value access for new subscribers
  • Type-safe updates with Effect integration

Creating a SubscriptionRef

import { Effect, SubscriptionRef } from "effect"

// Create with initial value
const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(0)
  return ref
})

Reading Values

Current Value

import { Effect, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(42)
  
  // Get current value
  const value = yield* SubscriptionRef.get(ref)
  console.log(value) // 42
})

Subscribe to Changes

import { Effect, Stream, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(0)
  
  // Subscribe to all changes
  const changes = ref.changes
  
  // Process changes in background
  const fiber = yield* Stream.runForEach(
    changes,
    (value) => Console.log(`New value: ${value}`)
  ).pipe(Effect.fork)
  
  // Update values
  yield* SubscriptionRef.set(ref, 1)
  yield* SubscriptionRef.set(ref, 2)
  yield* SubscriptionRef.set(ref, 3)
  
  yield* Effect.sleep("100 millis")
  yield* Fiber.interrupt(fiber)
})
// Output:
// New value: 0 (initial value)
// New value: 1
// New value: 2
// New value: 3

Updating Values

Direct Updates

import { Effect, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(0)
  
  // Set new value
  yield* SubscriptionRef.set(ref, 42)
  
  // Update with function
  yield* SubscriptionRef.update(ref, n => n + 1)
  
  // Update and get new value
  const newValue = yield* SubscriptionRef.updateAndGet(
    ref,
    n => n * 2
  )
})

Get-and-Update

import { Effect, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(10)
  
  // Get old value and update
  const oldValue = yield* SubscriptionRef.getAndUpdate(
    ref,
    n => n + 5
  )
  console.log(oldValue) // 10
  
  const current = yield* SubscriptionRef.get(ref)
  console.log(current) // 15
})

Effectful Updates

import { Effect, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(0)
  
  // Update with effect
  yield* SubscriptionRef.updateEffect(
    ref,
    (n) => Effect.succeed(n + 1)
  )
  
  // Update and get with effect
  const newValue = yield* SubscriptionRef.updateAndGetEffect(
    ref,
    (n) => Effect.gen(function*() {
      yield* Console.log(`Updating from ${n}`)
      return n * 2
    })
  )
})

Conditional Updates

Update Some

import { Effect, Option, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(5)
  
  // Update only if condition holds
  yield* SubscriptionRef.updateSome(
    ref,
    (n) => n > 10 ? Option.some(n + 1) : Option.none()
  )
  
  // With effects
  yield* SubscriptionRef.updateSomeEffect(
    ref,
    (n) => n > 10
      ? Option.some(Effect.succeed(n + 1))
      : Option.none()
  )
})

Get and Update Some

import { Effect, Option, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(10)
  
  // Conditionally update and get old value
  const oldValue = yield* SubscriptionRef.getAndUpdateSome(
    ref,
    (n) => n > 5 ? Option.some(n * 2) : Option.none()
  )
})

Modify Operations

Transform and Return

import { Effect, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(10)
  
  // Modify and return a derived value
  const result = yield* SubscriptionRef.modify(
    ref,
    (n) => [n.toString(), n + 1] // [return value, new state]
  )
  console.log(result) // "10"
  
  const current = yield* SubscriptionRef.get(ref)
  console.log(current) // 11
})

Modify with Effects

import { Effect, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(5)
  
  const doubled = yield* SubscriptionRef.modifyEffect(
    ref,
    (n) => Effect.succeed([n * 2, n + 1])
  )
  console.log(doubled) // 10
  
  const current = yield* SubscriptionRef.get(ref)
  console.log(current) // 6
})

Modify Some

import { Effect, Option, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(10)
  
  // Modify only if condition holds
  const result = yield* SubscriptionRef.modifySome(
    ref,
    "default",
    (n) => n > 5
      ? Option.some([n.toString(), n * 2])
      : Option.none()
  )
})

Multiple Subscribers

import { Effect, Fiber, Stream, SubscriptionRef } from "effect"

const program = Effect.gen(function*() {
  const ref = yield* SubscriptionRef.make(0)
  
  // First subscriber - logs all changes
  const subscriber1 = yield* Stream.runForEach(
    ref.changes,
    (value) => Console.log(`Subscriber 1: ${value}`)
  ).pipe(Effect.fork)
  
  // Second subscriber - only even values
  const subscriber2 = yield* ref.changes.pipe(
    Stream.filter(n => n % 2 === 0),
    Stream.runForEach(
      (value) => Console.log(`Subscriber 2 (evens): ${value}`)
    ),
    Effect.fork
  )
  
  // Third subscriber - transformed values
  const subscriber3 = yield* ref.changes.pipe(
    Stream.map(n => n * 10),
    Stream.runForEach(
      (value) => Console.log(`Subscriber 3 (×10): ${value}`)
    ),
    Effect.fork
  )
  
  // Emit some values
  for (let i = 1; i <= 5; i++) {
    yield* SubscriptionRef.set(ref, i)
    yield* Effect.sleep("100 millis")
  }
  
  // Cleanup
  yield* Fiber.interrupt(subscriber1)
  yield* Fiber.interrupt(subscriber2)
  yield* Fiber.interrupt(subscriber3)
})

Practical Examples

Counter with Live Updates

import { Effect, Fiber, Stream, SubscriptionRef } from "effect"

const counter = Effect.gen(function*() {
  const count = yield* SubscriptionRef.make(0)
  
  // Display updates
  const display = yield* Stream.runForEach(
    count.changes,
    (n) => Console.log(`Count: ${n}`)
  ).pipe(Effect.fork)
  
  // Increment counter
  for (let i = 0; i < 5; i++) {
    yield* Effect.sleep("500 millis")
    yield* SubscriptionRef.update(count, n => n + 1)
  }
  
  yield* Fiber.interrupt(display)
})

State Machine

import { Effect, Stream, SubscriptionRef } from "effect"

type State = "idle" | "loading" | "success" | "error"

const stateMachine = Effect.gen(function*() {
  const state = yield* SubscriptionRef.make<State>("idle")
  
  // Log state transitions
  yield* Stream.runForEach(
    state.changes,
    (s) => Console.log(`State: ${s}`)
  ).pipe(Effect.fork)
  
  // Simulate state transitions
  yield* SubscriptionRef.set(state, "loading")
  yield* Effect.sleep("1 second")
  
  const success = Math.random() > 0.5
  yield* SubscriptionRef.set(
    state,
    success ? "success" : "error"
  )
})

Configuration Updates

import { Effect, Stream, SubscriptionRef } from "effect"

interface Config {
  apiUrl: string
  timeout: number
  retries: number
}

const configManager = Effect.gen(function*() {
  const config = yield* SubscriptionRef.make<Config>({
    apiUrl: "https://api.example.com",
    timeout: 5000,
    retries: 3
  })
  
  // Services react to config changes
  yield* Stream.runForEach(
    config.changes,
    (cfg) => Console.log(`Config updated: ${JSON.stringify(cfg)}`)
  ).pipe(Effect.fork)
  
  // Update specific field
  yield* SubscriptionRef.update(
    config,
    (cfg) => ({ ...cfg, timeout: 10000 })
  )
  
  // Update with validation
  yield* SubscriptionRef.updateEffect(
    config,
    (cfg) => Effect.gen(function*() {
      const newTimeout = cfg.timeout * 2
      if (newTimeout > 30000) {
        return yield* Effect.fail("Timeout too large")
      }
      return { ...cfg, timeout: newTimeout }
    })
  )
})

Live Dashboard

import { Effect, Fiber, Stream, SubscriptionRef } from "effect"

interface Metrics {
  requests: number
  errors: number
  avgResponseTime: number
}

const dashboard = Effect.gen(function*() {
  const metrics = yield* SubscriptionRef.make<Metrics>({
    requests: 0,
    errors: 0,
    avgResponseTime: 0
  })
  
  // Display metrics updates
  const display = yield* metrics.changes.pipe(
    Stream.tap((m) =>
      Console.log(
        `Requests: ${m.requests}, ` +
        `Errors: ${m.errors}, ` +
        `Avg Response: ${m.avgResponseTime}ms`
      )
    ),
    Stream.runDrain,
    Effect.fork
  )
  
  // Alert on high error rate
  const alerting = yield* metrics.changes.pipe(
    Stream.filter((m) =>
      m.requests > 0 && m.errors / m.requests > 0.1
    ),
    Stream.runForEach(() =>
      Console.log("⚠️ High error rate detected!")
    ),
    Effect.fork
  )
  
  // Simulate metric updates
  for (let i = 0; i < 10; i++) {
    yield* Effect.sleep("500 millis")
    yield* SubscriptionRef.update(
      metrics,
      (m) => ({
        requests: m.requests + 1,
        errors: m.errors + (Math.random() > 0.8 ? 1 : 0),
        avgResponseTime: Math.random() * 200
      })
    )
  }
  
  yield* Fiber.interrupt(display)
  yield* Fiber.interrupt(alerting)
})

Comparison with Ref

FeatureRefSubscriptionRef
Atomic updates
Get current value
Subscribe to changes
Stream of values
Multiple observersManualBuilt-in
OverheadLowerHigher (pubsub)

Performance Considerations

  1. Use for observed state - If nothing subscribes to changes, use regular Ref
  2. Backpressure aware - The changes stream respects backpressure
  3. Multiple subscribers - Each subscriber gets independent stream
  4. Memory - Subscribers hold references; interrupt fibers when done

See Also

  • Ref - Simple mutable references
  • SynchronizedRef - Refs with effectful updates
  • Stream - Reactive streams
  • PubSub - Underlying publish-subscribe mechanism

Build docs developers (and LLMs) love