Skip to main content
A PubSub is an asynchronous message hub where publishers can publish messages and subscribers can subscribe to receive those messages. PubSub supports various backpressure strategies, message replay, and concurrent access from multiple producers and consumers.

Overview

PubSub provides a powerful publish-subscribe pattern:
  • Multiple subscribers: Messages are broadcast to all active subscribers
  • Backpressure strategies: Control behavior when capacity is reached
  • Replay buffer: Late subscribers can receive past messages
  • Scoped subscriptions: Automatic cleanup with Effect scopes

Basic Usage

import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  // Publisher
  yield* PubSub.publish(pubsub, "Hello")
  yield* PubSub.publish(pubsub, "World")

  // Subscriber
  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)
    const message1 = yield* PubSub.take(subscription)
    const message2 = yield* PubSub.take(subscription)
    console.log(message1, message2) // "Hello", "World"
  }))
})

Types

PubSub

interface PubSub<in out A>
An asynchronous message hub supporting multiple publishers and subscribers.

Subscription

interface Subscription<out A>
A consumer’s connection to a PubSub, allowing them to take messages.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription: PubSub.Subscription<string> = 
      yield* PubSub.subscribe(pubsub)

    const message = yield* PubSub.take(subscription)
    console.log(message)
  }))
})

Creating PubSubs

bounded

const bounded: <A>(capacity: number | {
  readonly capacity: number
  readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates a bounded PubSub with backpressure strategy. When full, publishers are suspended until space becomes available.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  // Simple bounded PubSub
  const pubsub = yield* PubSub.bounded<string>(100)

  // With replay buffer for late subscribers
  const pubsubWithReplay = yield* PubSub.bounded<string>({
    capacity: 100,
    replay: 10 // Last 10 messages replayed to new subscribers
  })
})

dropping

const dropping: <A>(capacity: number | {
  readonly capacity: number
  readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates a bounded PubSub that drops new messages when full.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.dropping<string>(3)

  yield* PubSub.publish(pubsub, "msg1") // succeeds
  yield* PubSub.publish(pubsub, "msg2") // succeeds
  yield* PubSub.publish(pubsub, "msg3") // succeeds
  
  const dropped = yield* PubSub.publish(pubsub, "msg4")
  console.log("Message dropped:", !dropped) // true
})

sliding

const sliding: <A>(capacity: number | {
  readonly capacity: number
  readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates a bounded PubSub that evicts old messages when full.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.sliding<string>(3)

  yield* PubSub.publish(pubsub, "msg1")
  yield* PubSub.publish(pubsub, "msg2")
  yield* PubSub.publish(pubsub, "msg3")
  yield* PubSub.publish(pubsub, "msg4") // "msg1" is evicted

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)
    const messages = yield* PubSub.takeAll(subscription)
    console.log(messages) // ["msg2", "msg3", "msg4"]
  }))
})

unbounded

const unbounded: <A>(options?: {
  readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates an unbounded PubSub.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.unbounded<string>()

  // Can publish unlimited messages
  for (let i = 0; i < 1000; i++) {
    yield* PubSub.publish(pubsub, `message-${i}`)
  }
})

Publishing

publish

const publish: {
  <A>(value: A): (self: PubSub<A>) => Effect<boolean>
  <A>(self: PubSub<A>, value: A): Effect<boolean>
}
Publishes a message to all subscribers.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  const published = yield* PubSub.publish(pubsub, "Hello World")
  console.log("Message published:", published) // true
})

publishAll

const publishAll: {
  <A>(elements: Iterable<A>): (self: PubSub<A>) => Effect<boolean>
  <A>(self: PubSub<A>, elements: Iterable<A>): Effect<boolean>
}
Publishes multiple messages at once.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  const messages = ["Hello", "World", "from", "Effect"]
  const allPublished = yield* PubSub.publishAll(pubsub, messages)
  console.log("All messages published:", allPublished)
})

publishUnsafe

const publishUnsafe: {
  <A>(value: A): (self: PubSub<A>) => boolean
  <A>(self: PubSub<A>, value: A): boolean
}
Synchronously publishes a message without Effect wrapping.
import { PubSub } from "effect"

declare const pubsub: PubSub.PubSub<string>

const published = PubSub.publishUnsafe(pubsub, "Hello")
if (published) {
  console.log("Message published successfully")
}

Subscribing

subscribe

const subscribe: <A>(
  self: PubSub<A>
) => Effect<Subscription<A>, never, Scope>
Creates a subscription to receive messages. Subscriptions are automatically cleaned up when the scope exits.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* PubSub.publish(pubsub, "Hello")
  yield* PubSub.publish(pubsub, "World")

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)

    const msg1 = yield* PubSub.take(subscription)
    const msg2 = yield* PubSub.take(subscription)
    console.log(msg1, msg2) // "Hello", "World"
  }))
})

Multiple Subscribers

All subscribers receive the same messages:
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* PubSub.publish(pubsub, "Broadcast")

  yield* Effect.scoped(Effect.gen(function*() {
    const sub1 = yield* PubSub.subscribe(pubsub)
    const sub2 = yield* PubSub.subscribe(pubsub)

    const [msg1, msg2] = yield* Effect.all([
      PubSub.take(sub1),
      PubSub.take(sub2)
    ])
    console.log("Both received:", msg1, msg2) // "Broadcast", "Broadcast"
  }))
})

Taking from Subscriptions

take

const take: <A>(self: Subscription<A>) => Effect<A>
Takes a single message from the subscription.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)

    // Start taking (will suspend until message arrives)
    const takeFiber = yield* Effect.forkChild(
      PubSub.take(subscription)
    )

    yield* PubSub.publish(pubsub, "Hello")

    const message = yield* Fiber.join(takeFiber)
    console.log("Received:", message) // "Hello"
  }))
})

takeAll

const takeAll: <A>(self: Subscription<A>) => Effect<NonEmptyArray<A>>
Takes all available messages.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3"])

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)
    const allMessages = yield* PubSub.takeAll(subscription)
    console.log("All messages:", allMessages) // ["msg1", "msg2", "msg3"]
  }))
})

takeUpTo

const takeUpTo: {
  (max: number): <A>(self: Subscription<A>) => Effect<Array<A>>
  <A>(self: Subscription<A>, max: number): Effect<Array<A>>
}
Takes up to the specified number of messages without suspending.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3", "msg4", "msg5"])

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)

    const upTo3 = yield* PubSub.takeUpTo(subscription, 3)
    console.log("Up to 3:", upTo3) // ["msg1", "msg2", "msg3"]
  }))
})

takeBetween

const takeBetween: {
  (min: number, max: number): <A>(self: Subscription<A>) => Effect<Array<A>>
  <A>(self: Subscription<A>, min: number, max: number): Effect<Array<A>>
}
Takes between min and max messages.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)

    const takeFiber = yield* Effect.forkChild(
      PubSub.takeBetween(subscription, 2, 5)
    )

    yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3"])

    const messages = yield* Fiber.join(takeFiber)
    console.log("Between 2-5:", messages) // ["msg1", "msg2", "msg3"]
  }))
})

Lifecycle

shutdown

const shutdown: <A>(self: PubSub<A>) => Effect<void>
Interrupts fibers waiting on offers or takes.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(1)

  yield* PubSub.publish(pubsub, "msg1")

  yield* PubSub.shutdown(pubsub)

  const result = yield* Effect.either(PubSub.publish(pubsub, "msg2"))
  console.log("Publisher interrupted:", result._tag === "Left")
})

isShutdown

const isShutdown: <A>(self: PubSub<A>) => Effect<boolean>
Checks if the PubSub has been shutdown.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  console.log(yield* PubSub.isShutdown(pubsub)) // false

  yield* PubSub.shutdown(pubsub)

  console.log(yield* PubSub.isShutdown(pubsub)) // true
})

awaitShutdown

const awaitShutdown: <A>(self: PubSub<A>) => Effect<void>
Waits until the PubSub is shutdown.
import { Effect, Fiber, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  const waiterFiber = yield* Effect.forkChild(
    Effect.gen(function*() {
      yield* PubSub.awaitShutdown(pubsub)
      console.log("PubSub has been shutdown!")
    })
  )

  yield* Effect.sleep("100 millis")
  yield* PubSub.shutdown(pubsub)
  yield* Fiber.join(waiterFiber)
})

Utilities

size

const size: <A>(self: PubSub<A>) => Effect<number>
Returns the number of elements in the PubSub.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* PubSub.publish(pubsub, "msg1")
  yield* PubSub.publish(pubsub, "msg2")

  const currentSize = yield* PubSub.size(pubsub)
  console.log("After publishing:", currentSize) // 2
})

capacity

const capacity: <A>(self: PubSub<A>) => number
Returns the maximum capacity of the PubSub.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(100)
  const cap = PubSub.capacity(pubsub)
  console.log("PubSub capacity:", cap) // 100
})

isEmpty

const isEmpty: <A>(self: PubSub<A>) => Effect<boolean>
Checks if the PubSub contains zero elements.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  console.log(yield* PubSub.isEmpty(pubsub)) // true

  yield* PubSub.publish(pubsub, "Hello")

  console.log(yield* PubSub.isEmpty(pubsub)) // false
})

isFull

const isFull: <A>(self: PubSub<A>) => Effect<boolean>
Checks if the PubSub is at capacity.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(2)

  console.log(yield* PubSub.isFull(pubsub)) // false

  yield* PubSub.publish(pubsub, "msg1")
  yield* PubSub.publish(pubsub, "msg2")

  console.log(yield* PubSub.isFull(pubsub)) // true
})

remaining

const remaining: <A>(self: Subscription<A>) => Effect<number>
Returns the number of messages available in a subscription.
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)

  yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3"])

  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)

    const count = yield* PubSub.remaining(subscription)
    console.log("Messages available:", count) // 3
  }))
})

Build docs developers (and LLMs) love