A Queue is a lightweight in-memory queue with bounded or unbounded capacity, supporting multiple concurrent producers and consumers.
Type Signature
interface Queue<in out A> extends Enqueue<A>, Dequeue<A> {
capacity(): number
size: Effect<number>
readonly isFull: Effect<boolean>
readonly isEmpty: Effect<boolean>
readonly shutdown: Effect<void>
}
interface Enqueue<in A> {
offer(value: A): Effect<boolean>
offerAll(iterable: Iterable<A>): Effect<boolean>
}
interface Dequeue<out A> extends Effect<A> {
readonly take: Effect<A>
readonly takeAll: Effect<Chunk<A>>
takeUpTo(max: number): Effect<Chunk<A>>
}
Creating Queues
bounded
unbounded
sliding
dropping
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
// Capacity: 100
})
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.unbounded<number>()
// Unlimited capacity
})
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.sliding<number>(10)
// Drops oldest when full
})
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.dropping<number>(10)
// Drops newest when full
})
Offering Values
offer
Adds a single value to the queue.
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
const success = yield* Queue.offer(queue, 42)
console.log(success) // true
})
offerAll
Adds multiple values to the queue.
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
const size = yield* Queue.size(queue)
console.log(size) // 5
})
Taking Values
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offer(queue, 42)
const value = yield* Queue.take(queue)
console.log(value) // 42
})
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
const values = yield* Queue.takeAll(queue)
console.log(values)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
})
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
const values = yield* Queue.takeUpTo(queue, 3)
console.log(values)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
})
Producer-Consumer Pattern
import { Effect, Queue, Fiber } from "effect"
const producer = (queue: Queue.Queue<number>) =>
Effect.gen(function* () {
for (let i = 1; i <= 10; i++) {
yield* Queue.offer(queue, i)
yield* Effect.sleep("100 millis")
}
})
const consumer = (queue: Queue.Queue<number>) =>
Effect.gen(function* () {
while (true) {
const value = yield* Queue.take(queue)
yield* Effect.log(`Consumed: ${value}`)
}
})
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(5)
const prod = yield* Effect.fork(producer(queue))
const cons = yield* Effect.fork(consumer(queue))
yield* Fiber.join(prod)
yield* Queue.shutdown(queue)
})
Effect.runPromise(program)
Queue Inspection
size
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3])
const size = yield* Queue.size(queue)
console.log(size) // 3
})
isEmpty / isFull
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(2)
const empty = yield* Queue.isEmpty(queue)
console.log(empty) // true
yield* Queue.offerAll(queue, [1, 2])
const full = yield* Queue.isFull(queue)
console.log(full) // true
})
Backpressure
Bounded queues provide automatic backpressure.
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(2)
yield* Queue.offer(queue, 1)
yield* Queue.offer(queue, 2)
// This will suspend until space is available
const fiber = yield* Effect.fork(Queue.offer(queue, 3))
yield* Effect.sleep("1 second")
yield* Queue.take(queue) // Makes space
yield* Fiber.join(fiber)
})
Shutdown
Shuts down a queue and interrupts waiting fibers.
import { Effect, Queue, Fiber } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
const fiber = yield* Effect.fork(
Queue.take(queue) // Will wait forever
)
yield* Effect.sleep("1 second")
yield* Queue.shutdown(queue)
const result = yield* Fiber.await(fiber)
console.log(result) // Interrupted
})
Queue Strategies
Sliding Queue
Drops oldest elements when full.
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.sliding<number>(3)
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
const values = yield* Queue.takeAll(queue)
console.log(values)
// { _id: 'Chunk', values: [ 3, 4, 5 ] }
})
Dropping Queue
Drops newest elements when full.
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.dropping<number>(3)
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
const values = yield* Queue.takeAll(queue)
console.log(values)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
})
Queues provide safe concurrent access and automatic synchronization between producers and consumers across multiple fibers.
Use Cases
- Producer-consumer patterns
- Work queues
- Rate limiting
- Buffering
- Event processing pipelines
Key Operations
| Operation | Description |
|---|
bounded | Creates bounded queue |
unbounded | Creates unbounded queue |
sliding | Creates sliding queue |
dropping | Creates dropping queue |
offer | Adds single value |
offerAll | Adds multiple values |
take | Takes single value |
takeAll | Takes all values |
takeUpTo | Takes up to N values |
shutdown | Closes the queue |