Skip to main content
A Queue is an asynchronous queue that can be offered to and taken from. It supports signaling that it is done or failed, and provides various backpressure strategies.

Overview

Queues in Effect provide a powerful way to coordinate between producers and consumers:
  • Asynchronous: Non-blocking operations with automatic suspension
  • Bounded or unbounded: Control memory usage with capacity limits
  • Multiple strategies: Backpressure, dropping, or sliding behaviors
  • Type-safe: Full type safety for values and errors

Basic Usage

import { Cause, Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  // Create a bounded queue
  const queue = yield* Queue.bounded<string>(10)

  // Producer: offer items to the queue
  yield* Queue.offer(queue, "hello")
  yield* Queue.offerAll(queue, ["world", "!"])

  // Consumer: take items from the queue
  const item1 = yield* Queue.take(queue)
  const item2 = yield* Queue.take(queue)
  const item3 = yield* Queue.take(queue)

  console.log([item1, item2, item3]) // ["hello", "world", "!"]
})

Types

Queue

interface Queue<in out A, in out E = never>
A full queue interface supporting both offer and take operations.

Enqueue

interface Enqueue<in A, in E = never>
Write-only interface - can offer elements but not take them.
import { Effect, Queue } from "effect"

// Function that only needs write access
const producer = (enqueue: Queue.Enqueue<string>) =>
  Effect.gen(function*() {
    yield* Queue.offer(enqueue as Queue.Queue<string>, "hello")
    yield* Queue.offerAll(enqueue as Queue.Queue<string>, ["world", "!"])
  })

Dequeue

interface Dequeue<out A, out E = never>
Read-only interface - can take elements but not offer them.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(10)

  // A Dequeue can only take elements
  const dequeue: Queue.Dequeue<string> = queue

  // Pre-populate the queue
  yield* Queue.offerAll(queue, ["a", "b", "c"])

  // Take elements using dequeue interface
  const item = yield* Queue.take(dequeue)
  console.log(item) // "a"
})

Creating Queues

bounded

const bounded: <A, E = never>(capacity: number) => Effect<Queue<A, E>>
Creates a bounded queue with backpressure strategy. When full, producers are suspended until space becomes available.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(5)

  yield* Queue.offer(queue, "first")
  yield* Queue.offer(queue, "second")

  const size = yield* Queue.size(queue)
  console.log(size) // 2
})

sliding

const sliding: <A, E = never>(capacity: number) => Effect<Queue<A, E>>
Creates a bounded queue with sliding strategy. When full, oldest elements are dropped to make room for new ones.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.sliding<number>(3)

  // Fill the queue
  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)
  yield* Queue.offer(queue, 3)

  // This drops the oldest element (1)
  yield* Queue.offer(queue, 4)

  const all = yield* Queue.takeAll(queue)
  console.log(all) // [2, 3, 4]
})

dropping

const dropping: <A, E = never>(capacity: number) => Effect<Queue<A, E>>
Creates a bounded queue with dropping strategy. When full, new elements are dropped and offer returns false.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.dropping<number>(2)

  // Fill the queue
  const success1 = yield* Queue.offer(queue, 1)
  const success2 = yield* Queue.offer(queue, 2)
  console.log(success1, success2) // true, true

  // This will be dropped
  const success3 = yield* Queue.offer(queue, 3)
  console.log(success3) // false

  const all = yield* Queue.takeAll(queue)
  console.log(all) // [1, 2]
})

unbounded

const unbounded: <A, E = never>() => Effect<Queue<A, E>>
Creates an unbounded queue that can grow to any size without blocking producers.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.unbounded<string>()

  // Producers can always add messages
  yield* Queue.offer(queue, "message1")
  yield* Queue.offer(queue, "message2")
  yield* Queue.offerAll(queue, ["message3", "message4"])

  const size = yield* Queue.size(queue)
  console.log(size) // 4
})

Offering to Queues

offer

const offer: <A, E>(
  self: Enqueue<A, E>,
  message: Types.NoInfer<A>
) => Effect<boolean>
Adds a message to the queue. Returns false if the queue is done.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(3)

  const success1 = yield* Queue.offer(queue, 1)
  const success2 = yield* Queue.offer(queue, 2)
  console.log(success1, success2) // true, true

  const size = yield* Queue.size(queue)
  console.log(size) // 2
})

offerAll

const offerAll: <A, E>(
  self: Enqueue<A, E>,
  messages: Iterable<A>
) => Effect<Array<A>>
Adds multiple messages to the queue. Returns remaining messages that couldn’t be added.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(3)

  // Try to add more messages than capacity
  const remaining = yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  console.log(remaining) // [4, 5]
})

offerUnsafe

const offerUnsafe: <A, E>(
  self: Enqueue<A, E>,
  message: Types.NoInfer<A>
) => boolean
Synchronously adds a message without Effect wrapping.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(3)

  const success = Queue.offerUnsafe(queue, 1)
  console.log(success) // true
})

Taking from Queues

take

const take: <A, E>(self: Dequeue<A, E>) => Effect<A, E>
Takes a single message from the queue. Suspends if no messages are available.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<string>(3)

  yield* Queue.offer(queue, "first")
  yield* Queue.offer(queue, "second")

  const msg1 = yield* Queue.take(queue)
  const msg2 = yield* Queue.take(queue)
  console.log(msg1, msg2) // "first", "second"
})

takeAll

const takeAll: <A, E>(self: Dequeue<A, E>) => Effect<NonEmptyArray<A>, E>
Takes all available messages, waiting for at least one.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(5)

  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])

  const messages = yield* Queue.takeAll(queue)
  console.log(messages) // [1, 2, 3, 4, 5]
})

takeN

const takeN: <A, E>(
  self: Dequeue<A, E>,
  n: number
) => Effect<Array<A>, E>
Takes a specified number of messages.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)

  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7])

  const first3 = yield* Queue.takeN(queue, 3)
  console.log(first3) // [1, 2, 3]
})

takeBetween

const takeBetween: <A, E>(
  self: Dequeue<A, E>,
  min: number,
  max: number
) => Effect<Array<A>, E>
Takes between min and max messages.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)

  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7, 8])

  const batch = yield* Queue.takeBetween(queue, 2, 5)
  console.log(batch) // [1, 2, 3, 4, 5]
})

poll

const poll: <A, E>(self: Dequeue<A, E>) => Effect<Option<A>>
Tries to take an item without blocking.
import { Effect, Option, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)

  const maybe1 = yield* Queue.poll(queue)
  console.log(Option.isNone(maybe1)) // true

  yield* Queue.offer(queue, 42)

  const maybe2 = yield* Queue.poll(queue)
  console.log(Option.getOrNull(maybe2)) // 42
})

peek

const peek: <A, E>(self: Dequeue<A, E>) => Effect<A, E>
Views the next item without removing it.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)
  yield* Queue.offer(queue, 42)

  const item = yield* Queue.peek(queue)
  console.log(item) // 42
  
  // Item is still in the queue
  const size = yield* Queue.size(queue)
  console.log(size) // 1
})

Queue Lifecycle

end

const end: <A, E>(self: Enqueue<A, E | Done>) => Effect<boolean>
Signals that the queue is complete. No more messages will be accepted.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)

  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)

  const ended = yield* Queue.end(queue)
  console.log(ended) // true

  const offerResult = yield* Queue.offer(queue, 3)
  console.log(offerResult) // false
})

fail

const fail: <A, E>(self: Queue<A, E>, error: E) => Effect<boolean>
Fails the queue with an error.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number, string>(10)

  yield* Queue.offer(queue, 1)
  const failed = yield* Queue.fail(queue, "Something went wrong")
  console.log(failed) // true
})

shutdown

const shutdown: <A, E>(self: Enqueue<A, E>) => Effect<boolean>
Shutdowns the queue, canceling pending operations.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(2)

  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)

  const wasShutdown = yield* Queue.shutdown(queue)
  console.log(wasShutdown) // true
})

Utilities

size

const size: <A, E>(self: Dequeue<A, E>) => Effect<number>
Returns the current size of the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  
  const currentSize = yield* Queue.size(queue)
  console.log(currentSize) // 5
})

isFull

const isFull: <A, E>(self: Dequeue<A, E>) => Effect<boolean>
Checks if the queue is full.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(3)

  console.log(yield* Queue.isFull(queue)) // false

  yield* Queue.offerAll(queue, [1, 2, 3])

  console.log(yield* Queue.isFull(queue)) // true
})

clear

const clear: <A, E>(self: Dequeue<A, E>) => Effect<Array<A>>
Removes all messages from the queue.
import { Effect, Queue } from "effect"

const program = Effect.gen(function*() {
  const queue = yield* Queue.bounded<number>(10)

  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])

  const messages = yield* Queue.clear(queue)
  console.log(messages) // [1, 2, 3, 4, 5]

  const size = yield* Queue.size(queue)
  console.log(size) // 0
})

Build docs developers (and LLMs) love