Skip to main content
The Queue module provides asynchronous, concurrent queues for message passing between producers and consumers with built-in backpressure support.

Overview

A Queue<A, E> is an asynchronous queue that:
  • Can be offered to (enqueue) and taken from (dequeue)
  • Supports concurrent producers and consumers
  • Provides backpressure strategies (suspend, drop, slide)
  • Can signal completion or failure
  • Handles concurrent access safely

Creating Queues

Bounded Queue

Creates a queue with backpressure when full:
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  // Create a bounded queue with capacity 10
  const queue = yield* Queue.bounded<string>(10)
  
  // Offer items to the queue
  yield* Queue.offer(queue, "hello")
  yield* Queue.offer(queue, "world")
  
  // Take items from the queue
  const item1 = yield* Queue.take(queue)
  const item2 = yield* Queue.take(queue)
  
  console.log([item1, item2]) // ["hello", "world"]
})

Unbounded Queue

Creates a queue without capacity limits:
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  // Unbounded queue - never blocks on offer
  const queue = yield* Queue.unbounded<number>()
  
  yield* Queue.offer(queue, 42)
})

Dropping Queue

Drops new items when full:
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  // Drops items when capacity is reached
  const queue = yield* Queue.dropping<string>(5)
  
  // Returns false if item was dropped
  const accepted = yield* Queue.offer(queue, "message")
})

Sliding Queue

Removes oldest items when full:
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  // Removes oldest items to make room for new ones
  const queue = yield* Queue.sliding<number>(10)
  
  yield* Queue.offer(queue, 42)
})

Offering Items

Offer Single Item

import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(10)
  
  // Offer blocks if queue is full (for bounded queues)
  yield* Queue.offer(queue, "message")
})

Offer Multiple Items

import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)
  
  // Offer all items
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
})

Taking Items

Take Single Item

import { Effect, Queue } from "effect"

const consumer = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(10)
  
  // Blocks until an item is available
  const item = yield* Queue.take(queue)
  
  return item
})

Take Multiple Items

import { Effect, Queue } from "effect"

const consumer = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)
  
  // Take up to N items (non-blocking)
  const items = yield* Queue.takeUpTo(queue, 5)
  
  console.log(items) // Array with 0-5 items
})

Take All Available

import { Effect, Queue } from "effect"

const consumer = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(10)
  
  // Take all currently available items
  const items = yield* Queue.takeAll(queue)
  
  return items
})

Producer-Consumer Pattern

import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)
  
  // Producer: generates items
  const producer = Effect.gen(function*() {
    for (let i = 0; i < 100; i++) {
      yield* Queue.offer(queue, i)
      yield* Effect.sleep("10 millis")
    }
    // Signal completion
    yield* Queue.done(queue)
  })
  
  // Consumer: processes items
  const consumer = Effect.gen(function*() {
    while (true) {
      const item = yield* Queue.take(queue)
      yield* Effect.log(`Processing: ${item}`)
    }
  })
  
  // Run producer and consumer concurrently
  yield* Effect.all(
    [producer, consumer],
    { concurrency: "unbounded" }
  )
})

Multiple Consumers

import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(100)
  
  // Producer
  const producer = Effect.gen(function*() {
    for (let i = 0; i < 1000; i++) {
      yield* Queue.offer(queue, i)
    }
  })
  
  // Consumer worker
  const worker = (id: number) =>
    Effect.gen(function*() {
      yield* Effect.forever(
        Effect.gen(function*() {
          const item = yield* Queue.take(queue)
          yield* Effect.log(`Worker ${id} processing: ${item}`)
          yield* Effect.sleep("100 millis")
        })
      )
    })
  
  // Run with multiple workers
  yield* Effect.all(
    [
      producer,
      worker(1),
      worker(2),
      worker(3)
    ],
    { concurrency: "unbounded" }
  )
})

Queue State and Inspection

Check Size and Capacity

import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3])
  
  // Get current size
  const size = yield* Queue.size(queue)
  console.log(size) // 3
  
  // Get capacity
  const capacity = yield* Queue.capacity(queue)
  console.log(capacity) // 10
  
  // Check if empty
  const empty = yield* Queue.isEmpty(queue)
  console.log(empty) // false
  
  // Check if full
  const full = yield* Queue.isFull(queue)
  console.log(full) // false
})

Completion and Errors

Signal Completion

import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(10)
  
  // Signal that queue is done
  yield* Queue.done(queue)
  
  // Subsequent takes will fail
  const result = yield* Effect.exit(Queue.take(queue))
  console.log(result._tag) // "Failure"
})

Signal Failure

import { Cause, Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string, string>(10)
  
  // Signal failure
  yield* Queue.fail(queue, "Processing error")
  
  // Subsequent operations will fail with this error
  const result = yield* Effect.exit(Queue.take(queue))
  
  if (result._tag === "Failure") {
    console.log(Cause.failures(result.cause)) // ["Processing error"]
  }
})

Queue as a Service

import { Effect, Layer, Queue, ServiceMap } from "effect"

class TaskQueue extends ServiceMap.Service<TaskQueue, {
  submit(task: string): Effect.Effect<void>
  process(): Effect.Effect<string>
}>()("TaskQueue") {
  static readonly layer = Layer.effect(
    TaskQueue,
    Effect.gen(function*() {
      const queue = yield* Queue.bounded<string>(100)
      
      return TaskQueue.of({
        submit: (task) => Queue.offer(queue, task),
        process: () => Queue.take(queue)
      })
    })
  )
}

// Usage
const program = Effect.gen(function*() {
  const taskQueue = yield* TaskQueue
  
  // Submit tasks
  yield* taskQueue.submit("task-1")
  yield* taskQueue.submit("task-2")
  
  // Process tasks
  const task = yield* taskQueue.process()
  yield* Effect.log(`Processing: ${task}`)
}).pipe(
  Effect.provide(TaskQueue.layer)
)

Integration with Streams

import { Effect, Queue, Stream } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)
  
  // Convert queue to stream
  const stream = Stream.fromQueue(queue)
  
  // Process as stream
  yield* Stream.take(stream, 5).pipe(
    Stream.runForEach((n) => Console.log(`Received: ${n}`))
  )
})

Advanced Patterns

Priority Queue Simulation

import { Effect, Queue } from "effect"

class PriorityTask {
  constructor(
    readonly priority: number,
    readonly task: string
  ) {}
}

const program = Effect.gen(function*() {
  const highPriorityQueue = yield* Queue.bounded<string>(10)
  const lowPriorityQueue = yield* Queue.bounded<string>(10)
  
  const submit = (task: PriorityTask) =>
    task.priority > 5
      ? Queue.offer(highPriorityQueue, task.task)
      : Queue.offer(lowPriorityQueue, task.task)
  
  const process = Effect.gen(function*() {
    // Try high priority first
    const highPriorityItems = yield* Queue.takeUpTo(highPriorityQueue, 1)
    
    if (highPriorityItems.length > 0) {
      return highPriorityItems[0]
    }
    
    // Fall back to low priority
    return yield* Queue.take(lowPriorityQueue)
  })
})

Rate Limiting

import { Effect, Queue, Schedule } from "effect"

const rateLimitedQueue = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(100)
  
  // Consumer with rate limiting
  const consumer = Effect.gen(function*() {
    yield* Effect.forever(
      Effect.gen(function*() {
        const item = yield* Queue.take(queue)
        yield* processItem(item)
      }).pipe(
        Effect.repeat(Schedule.spaced("100 millis"))
      )
    )
  })
  
  return { queue, consumer }
})

Best Practices

  1. Choose the right queue type: Use bounded for backpressure, unbounded for fire-and-forget
  2. Signal completion: Use Queue.done() to gracefully shutdown consumers
  3. Handle errors properly: Use Queue.fail() to propagate errors to consumers
  4. Avoid blocking: Process items asynchronously to prevent blocking
  5. Use appropriate capacity: Size queues based on producer/consumer rates
  6. Monitor queue depth: Track queue size to detect bottlenecks

Next Steps

  • Learn about PubSub for broadcasting messages
  • Explore Stream for processing sequences
  • Understand Effect for concurrent operations

Build docs developers (and LLMs) love