Skip to main content
A Queue is a lightweight in-memory queue with bounded or unbounded capacity, supporting multiple concurrent producers and consumers.

Type Signature

interface Queue<in out A> extends Enqueue<A>, Dequeue<A> {
  capacity(): number
  size: Effect<number>
  readonly isFull: Effect<boolean>
  readonly isEmpty: Effect<boolean>
  readonly shutdown: Effect<void>
}

interface Enqueue<in A> {
  offer(value: A): Effect<boolean>
  offerAll(iterable: Iterable<A>): Effect<boolean>
}

interface Dequeue<out A> extends Effect<A> {
  readonly take: Effect<A>
  readonly takeAll: Effect<Chunk<A>>
  takeUpTo(max: number): Effect<Chunk<A>>
}

Creating Queues

import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(100)
  // Capacity: 100
})

Offering Values

offer

Adds a single value to the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  const success = yield* Queue.offer(queue, 42)
  console.log(success)  // true
})

offerAll

Adds multiple values to the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  
  const size = yield* Queue.size(queue)
  console.log(size)  // 5
})

Taking Values

import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  yield* Queue.offer(queue, 42)
  
  const value = yield* Queue.take(queue)
  console.log(value)  // 42
})

Producer-Consumer Pattern

import { Effect, Queue, Fiber } from "effect"

const producer = (queue: Queue.Queue<number>) =>
  Effect.gen(function* () {
    for (let i = 1; i <= 10; i++) {
      yield* Queue.offer(queue, i)
      yield* Effect.sleep("100 millis")
    }
  })

const consumer = (queue: Queue.Queue<number>) =>
  Effect.gen(function* () {
    while (true) {
      const value = yield* Queue.take(queue)
      yield* Effect.log(`Consumed: ${value}`)
    }
  })

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(5)
  
  const prod = yield* Effect.fork(producer(queue))
  const cons = yield* Effect.fork(consumer(queue))
  
  yield* Fiber.join(prod)
  yield* Queue.shutdown(queue)
})

Effect.runPromise(program)

Queue Inspection

size

import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  yield* Queue.offerAll(queue, [1, 2, 3])
  
  const size = yield* Queue.size(queue)
  console.log(size)  // 3
})

isEmpty / isFull

import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(2)
  
  const empty = yield* Queue.isEmpty(queue)
  console.log(empty)  // true
  
  yield* Queue.offerAll(queue, [1, 2])
  
  const full = yield* Queue.isFull(queue)
  console.log(full)  // true
})

Backpressure

Bounded queues provide automatic backpressure.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(2)
  
  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)
  
  // This will suspend until space is available
  const fiber = yield* Effect.fork(Queue.offer(queue, 3))
  
  yield* Effect.sleep("1 second")
  yield* Queue.take(queue)  // Makes space
  
  yield* Fiber.join(fiber)
})

Shutdown

Shuts down a queue and interrupts waiting fibers.
import { Effect, Queue, Fiber } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  const fiber = yield* Effect.fork(
    Queue.take(queue)  // Will wait forever
  )
  
  yield* Effect.sleep("1 second")
  yield* Queue.shutdown(queue)
  
  const result = yield* Fiber.await(fiber)
  console.log(result)  // Interrupted
})

Queue Strategies

Sliding Queue

Drops oldest elements when full.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.sliding<number>(3)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  
  const values = yield* Queue.takeAll(queue)
  console.log(values)
  // { _id: 'Chunk', values: [ 3, 4, 5 ] }
})

Dropping Queue

Drops newest elements when full.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.dropping<number>(3)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  
  const values = yield* Queue.takeAll(queue)
  console.log(values)
  // { _id: 'Chunk', values: [ 1, 2, 3 ] }
})
Queues provide safe concurrent access and automatic synchronization between producers and consumers across multiple fibers.

Use Cases

  • Producer-consumer patterns
  • Work queues
  • Rate limiting
  • Buffering
  • Event processing pipelines

Key Operations

OperationDescription
boundedCreates bounded queue
unboundedCreates unbounded queue
slidingCreates sliding queue
droppingCreates dropping queue
offerAdds single value
offerAllAdds multiple values
takeTakes single value
takeAllTakes all values
takeUpToTakes up to N values
shutdownCloses the queue

Build docs developers (and LLMs) love