Skip to main content
A Queue is a lightweight, asynchronous queue for Effect programs. Queues support both bounded and unbounded operation, with different backpressure strategies.

Types

Queue

interface Queue<in out A> extends Enqueue<A>, Dequeue<A>
A Queue<A> can both enqueue and dequeue values of type A.

Enqueue

interface Enqueue<in A> {
  offer(value: A): Effect.Effect<boolean>
  unsafeOffer(value: A): boolean
  offerAll(iterable: Iterable<A>): Effect.Effect<boolean>
}
The write-only end of a queue.

Dequeue

interface Dequeue<out A> extends Effect.Effect<A> {
  readonly take: Effect.Effect<A>
  readonly takeAll: Effect.Effect<Chunk.Chunk<A>>
  takeUpTo(max: number): Effect.Effect<Chunk.Chunk<A>>
  takeBetween(min: number, max: number): Effect.Effect<Chunk.Chunk<A>>
}
The read-only end of a queue.

Creating Queues

bounded

Creates a bounded queue with backpressure. When the queue is full, offer will suspend until space is available.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(3)
  
  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)
  yield* Queue.offer(queue, 3)
  
  // This will suspend until space is available
  yield* Queue.offer(queue, 4)
})

unbounded

Creates an unbounded queue that never reaches capacity.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.unbounded<string>()
  
  yield* Queue.offer(queue, "hello")
  yield* Queue.offer(queue, "world")
  
  const value = yield* Queue.take(queue)
  console.log(value) // "hello"
})

dropping

Creates a bounded queue with dropping strategy. When full, new elements are dropped and old elements remain.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.dropping<number>(2)
  
  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)
  const success = yield* Queue.offer(queue, 3) // Returns false, 3 is dropped
  
  console.log(success) // false
  const all = yield* Queue.takeAll(queue)
  console.log(all) // Chunk(1, 2)
})

sliding

Creates a bounded queue with sliding strategy. When full, new elements are added and old elements are dropped.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.sliding<number>(2)
  
  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)
  yield* Queue.offer(queue, 3) // Removes 1, adds 3
  
  const all = yield* Queue.takeAll(queue)
  console.log(all) // Chunk(2, 3)
})

Offering Values

offer

Places a value in the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<string>(10)
  const success = yield* Queue.offer(queue, "message")
  console.log(success) // true
})

offerAll

Places multiple values in the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  
  const size = yield* Queue.size(queue)
  console.log(size) // 5
})

unsafeOffer

Unsafely offers a value without the fiber runtime.
const unsafeOffer: {
  <A>(value: A): (self: Enqueue<A>) => boolean
  <A>(self: Enqueue<A>, value: A): boolean
}

Taking Values

take

Takes the oldest value from the queue. Suspends if the queue is empty.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offer(queue, 42)
  const value = yield* Queue.take(queue)
  console.log(value) // 42
})

takeAll

Takes all values from the queue. Returns an empty chunk if the queue is empty.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  const values = yield* Queue.takeAll(queue)
  console.log(values) // Chunk(1, 2, 3, 4, 5)
})

takeUpTo

Takes up to a maximum number of values from the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  const values = yield* Queue.takeUpTo(queue, 3)
  console.log(values) // Chunk(1, 2, 3)
})

takeBetween

Takes between min and max values from the queue. Suspends until at least min values are available.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  const values = yield* Queue.takeBetween(queue, 2, 4)
  console.log(values) // Chunk(1, 2, 3, 4)
})

poll

Returns the first value as Some<A>, or None if the queue is empty.
import { Effect, Queue, Option } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  const empty = yield* Queue.poll(queue)
  console.log(Option.isNone(empty)) // true
  
  yield* Queue.offer(queue, 42)
  const value = yield* Queue.poll(queue)
  console.log(value) // Option.some(42)
})

Queue Information

size

Gets the current size of the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  yield* Queue.offerAll(queue, [1, 2, 3])
  const size = yield* Queue.size(queue)
  console.log(size) // 3
})

capacity

Gets the capacity of the queue.
import { Queue } from "effect"

const capacity: <A>(self: Queue<A>) => number

isEmpty

Checks if the queue is empty.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  const empty = yield* Queue.isEmpty(queue)
  console.log(empty) // true
})

isFull

Checks if the queue is full.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(2)
  yield* Queue.offerAll(queue, [1, 2])
  const full = yield* Queue.isFull(queue)
  console.log(full) // true
})

Shutdown

shutdown

Interrupts any fibers suspended on offer or take.
import { Effect, Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  yield* Queue.shutdown(queue)
  
  const isShutdown = yield* Queue.isShutdown(queue)
  console.log(isShutdown) // true
})

awaitShutdown

Waits until the queue is shutdown.
const awaitShutdown: <A>(self: Queue<A>) => Effect.Effect<void>

Producer-Consumer Example

import { Effect, Queue, Fiber } from "effect"

const producer = (queue: Queue.Queue<number>, id: number) =>
  Effect.gen(function* () {
    for (let i = 0; i < 5; i++) {
      const value = id * 10 + i
      yield* Queue.offer(queue, value)
      console.log(`Producer ${id}: sent ${value}`)
      yield* Effect.sleep("100 millis")
    }
  })

const consumer = (queue: Queue.Queue<number>, id: number) =>
  Effect.gen(function* () {
    while (true) {
      const value = yield* Queue.take(queue)
      console.log(`Consumer ${id}: received ${value}`)
      yield* Effect.sleep("200 millis")
    }
  })

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)

  // Start producers
  const p1 = yield* Effect.fork(producer(queue, 1))
  const p2 = yield* Effect.fork(producer(queue, 2))

  // Start consumers
  const c1 = yield* Effect.fork(consumer(queue, 1))
  const c2 = yield* Effect.fork(consumer(queue, 2))

  // Wait for producers to finish
  yield* Fiber.join(p1)
  yield* Fiber.join(p2)

  yield* Effect.sleep("2 seconds")

  // Shutdown queue and consumers
  yield* Queue.shutdown(queue)
  yield* Fiber.interrupt(c1)
  yield* Fiber.interrupt(c2)
})

Task Queue Example

import { Effect, Queue } from "effect"

interface Task {
  id: string
  execute: Effect.Effect<void>
}

const makeTaskQueue = (workers: number) =>
  Effect.gen(function* () {
    const queue = yield* Queue.bounded<Task>(100)

    const worker = (id: number) =>
      Effect.forever(
        Effect.gen(function* () {
          const task = yield* Queue.take(queue)
          console.log(`Worker ${id}: executing task ${task.id}`)
          yield* task.execute
          console.log(`Worker ${id}: completed task ${task.id}`)
        })
      )

    // Start workers
    for (let i = 0; i < workers; i++) {
      yield* Effect.fork(worker(i))
    }

    return {
      submit: (task: Task) => Queue.offer(queue, task),
      shutdown: Queue.shutdown(queue)
    }
  })

const program = Effect.gen(function* () {
  const taskQueue = yield* makeTaskQueue(3)

  // Submit tasks
  for (let i = 0; i < 10; i++) {
    yield* taskQueue.submit({
      id: `task-${i}`,
      execute: Effect.gen(function* () {
        yield* Effect.sleep("500 millis")
      })
    })
  }

  yield* Effect.sleep("10 seconds")
  yield* taskQueue.shutdown
})

Build docs developers (and LLMs) love