Skip to main content
The PubSub module provides utilities for building publish-subscribe systems where publishers can send messages to many subscribers concurrently.

Overview

A PubSub<A> is an asynchronous message hub where:
  • Publishers can publish messages of type A
  • Subscribers can subscribe to receive messages
  • Multiple subscribers can receive the same messages
  • Supports various backpressure strategies
  • Handles concurrent access safely

Creating PubSub

Bounded PubSub

Creates a PubSub with backpressure - publishers wait when full:
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  // Create a bounded PubSub with capacity 10
  const pubsub = yield* PubSub.bounded<string>(10)
  
  // Publish messages
  yield* PubSub.publish(pubsub, "Hello")
  yield* PubSub.publish(pubsub, "World")
})

Unbounded PubSub

Creates a PubSub without capacity limits:
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  // Unbounded - never blocks publishers
  const pubsub = yield* PubSub.unbounded<number>()
  
  yield* PubSub.publish(pubsub, 42)
})

Dropping PubSub

Drops new messages when full:
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  // Drops messages when capacity is reached
  const pubsub = yield* PubSub.dropping<string>(100)
  
  // This returns false if dropped
  const published = yield* PubSub.publish(pubsub, "message")
})

Sliding PubSub

Drops oldest messages when full:
import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  // Removes oldest messages when full
  const pubsub = yield* PubSub.sliding<string>(10)
  
  yield* PubSub.publish(pubsub, "message")
})

Publishing Messages

Publish Single Message

import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)
  
  // Publish returns when message is accepted
  yield* PubSub.publish(pubsub, "Hello")
  
  yield* Effect.log("Message published")
})

Publish Multiple Messages

import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)
  
  // Publish all messages
  yield* PubSub.publishAll(pubsub, [
    "Message 1",
    "Message 2",
    "Message 3"
  ])
})

Subscribing to Messages

Basic Subscription

import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)
  
  // Subscribe within a scope for automatic cleanup
  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)
    
    // Take messages from subscription
    const message1 = yield* PubSub.take(subscription)
    const message2 = yield* PubSub.take(subscription)
    
    console.log(message1, message2)
  }))
})

Multiple Subscribers

import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<number>(10)
  
  // Multiple subscribers receive the same messages
  yield* Effect.scoped(Effect.gen(function*() {
    const sub1 = yield* PubSub.subscribe(pubsub)
    const sub2 = yield* PubSub.subscribe(pubsub)
    
    yield* PubSub.publish(pubsub, 42)
    
    const value1 = yield* PubSub.take(sub1)
    const value2 = yield* PubSub.take(sub2)
    
    console.log(value1, value2) // Both are 42
  }))
})

Taking Messages

Take Single Message

import { Effect, PubSub } from "effect"

const subscriber = Effect.gen(function*() {
  const subscription = yield* PubSub.subscribe(pubsub)
  
  // Blocks until a message is available
  const message = yield* PubSub.take(subscription)
  
  return message
})

Take Multiple Messages

import { Effect, PubSub } from "effect"

const subscriber = Effect.gen(function*() {
  const subscription = yield* PubSub.subscribe(pubsub)
  
  // Take up to N messages (non-blocking)
  const messages = yield* PubSub.takeUpTo(subscription, 5)
  
  console.log(messages) // Array with 0-5 messages
})

Take All Available

import { Effect, PubSub } from "effect"

const subscriber = Effect.gen(function*() {
  const subscription = yield* PubSub.subscribe(pubsub)
  
  // Take all currently available messages
  const messages = yield* PubSub.takeAll(subscription)
  
  return messages
})

Integration with Streams

import { Effect, PubSub, Stream } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<number>(10)
  
  // Convert subscription to Stream
  const stream = Stream.fromPubSub(pubsub)
  
  // Process messages as a stream
  yield* Stream.take(stream, 5).pipe(
    Stream.runForEach((n) => Console.log(`Received: ${n}`))
  )
})

Broadcasting Domain Events

import { Effect, Layer, PubSub, Schema, ServiceMap } from "effect"

// Define domain event
class UserCreated extends Schema.Class<UserCreated>()("UserCreated", {
  userId: Schema.Number,
  email: Schema.String
}) {}

type DomainEvent = UserCreated

// Event bus service
class EventBus extends ServiceMap.Service<EventBus, {
  publish(event: DomainEvent): Effect.Effect<void>
  subscribe(): Effect.Effect<PubSub.Subscription<DomainEvent>, never, Scope>
}>()("EventBus") {
  static readonly layer = Layer.effect(
    EventBus,
    Effect.gen(function*() {
      const pubsub = yield* PubSub.unbounded<DomainEvent>()
      
      return EventBus.of({
        publish: (event) => PubSub.publish(pubsub, event),
        subscribe: () => PubSub.subscribe(pubsub)
      })
    })
  )
}

// Publisher
const createUser = Effect.gen(function*() {
  const eventBus = yield* EventBus
  
  yield* eventBus.publish(
    new UserCreated({ userId: 123, email: "[email protected]" })
  )
})

// Subscriber
const userEventHandler = Effect.gen(function*() {
  const eventBus = yield* EventBus
  
  yield* Effect.scoped(Effect.gen(function*() {
    const subscription = yield* eventBus.subscribe()
    
    yield* Effect.forever(
      Effect.gen(function*() {
        const event = yield* PubSub.take(subscription)
        
        if (event._tag === "UserCreated") {
          yield* Effect.log(`User created: ${event.email}`)
        }
      })
    )
  }))
})

Shutdown and Cleanup

import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)
  
  // Shutdown the PubSub
  yield* PubSub.shutdown(pubsub)
  
  // Further operations will fail
  const result = yield* Effect.exit(
    PubSub.publish(pubsub, "message")
  )
  
  console.log(result._tag) // "Failure"
})

Capacity and Size

import { Effect, PubSub } from "effect"

const program = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<string>(10)
  
  // Get current size
  const size = yield* PubSub.size(pubsub)
  console.log(size)
  
  // Get capacity
  const capacity = yield* PubSub.capacity(pubsub)
  console.log(capacity) // 10
  
  // Check if full
  const full = yield* PubSub.isFull(pubsub)
  console.log(full)
})

Best Practices

  1. Use scoped subscriptions: Always subscribe within Effect.scoped for automatic cleanup
  2. Choose appropriate capacity: Size PubSub based on producer/consumer rates
  3. Select the right strategy: Use bounded for backpressure, dropping/sliding for lossy scenarios
  4. Handle shutdown gracefully: Shutdown PubSub when no longer needed
  5. Use with Streams: Convert to Stream for powerful composition
  6. Avoid blocking subscribers: Process messages asynchronously to prevent blocking

Common Patterns

Fan-out Pattern

import { Effect, PubSub } from "effect"

// One producer, multiple consumers
const fanOut = Effect.gen(function*() {
  const pubsub = yield* PubSub.bounded<number>(10)
  
  // Producer
  const producer = Effect.gen(function*() {
    for (let i = 0; i < 100; i++) {
      yield* PubSub.publish(pubsub, i)
    }
  })
  
  // Multiple consumers
  const consumer = Effect.scoped(Effect.gen(function*() {
    const subscription = yield* PubSub.subscribe(pubsub)
    
    yield* Effect.forever(
      Effect.gen(function*() {
        const value = yield* PubSub.take(subscription)
        yield* processValue(value)
      })
    )
  }))
  
  // Run producer with multiple consumers
  yield* Effect.all(
    [producer, consumer, consumer, consumer],
    { concurrency: "unbounded" }
  )
})

Next Steps

  • Learn about Stream for processing message sequences
  • Explore Queue for point-to-point messaging
  • Understand Effect for composing concurrent operations

Build docs developers (and LLMs) love