Skip to main content
Queue is a distributed, FIFO (first-in, first-out) queue for data flow in Modal apps. The queue can contain any object serializable by cloudpickle, including Modal objects.

Key concepts

FIFO ordering: By default, the Queue acts as a single FIFO queue supporting puts and gets (blocking and non-blocking). Partitions: You can specify partition keys to access independent FIFO queues within the same Queue object. Across any two partitions, puts and gets are completely independent. Lifetime: By default, each partition is cleared 24 hours after the last put operation. A lower TTL can be specified. On app completion or after stopping an app, all partitions are cleared. Limits: A single Queue can contain up to 100,000 partitions, each with up to 5,000 items. Each item can be up to 1 MiB. Partition keys must be non-empty and must not exceed 64 bytes.
Queues are best used for communication between active functions and should not be relied on for persistent storage.

Basic usage

from modal import Queue

# Create an ephemeral queue
with Queue.ephemeral() as my_queue:
    # Putting values
    my_queue.put("some value")
    my_queue.put(123)

    # Getting values
    assert my_queue.get() == "some value"
    assert my_queue.get() == 123

# Create a persistent queue
queue = Queue.from_name("my-persisted-queue", create_if_missing=True)
queue.put(42)
assert queue.get() == 42

Creating queues

Reference by name

Create or reference a named queue that persists:
q = modal.Queue.from_name("my-queue", create_if_missing=True)
q.put(123)

Ephemeral queues

Create a temporary queue that exists only within a context manager:
from modal import Queue

with Queue.ephemeral() as q:
    q.put(123)
async with Queue.ephemeral() as q:
    await q.put.aio(123)

Create from ID

Reference a queue by its object ID:
@app.function()
def my_consumer(queue_id: str):
    queue = modal.Queue.from_id(queue_id)
    queue.put("Hello from remote function!")

with modal.Queue.ephemeral() as q:
    # Pass the queue ID to a remote function
    my_consumer.remote(q.object_id)
    print(q.get())  # "Hello from remote function!"

Queue operations

Put items

Add items to the queue:
q = Queue.from_name("my-queue", create_if_missing=True)

# Put single item
q.put("item")

# Put multiple items
q.put_many([1, 2, 3, 4, 5])

Blocking behavior

By default, put blocks if the queue is full:
# Block indefinitely until space is available
q.put("item", block=True)

# Block for up to 30 seconds
q.put("item", block=True, timeout=30)

# Don't block, raise queue.Full immediately if full
try:
    q.put("item", block=False)
except queue.Full:
    print("Queue is full")

Get items

Remove and return items from the queue:
# Get single item (blocks until available)
item = q.get()

# Get multiple items
items = q.get_many(10)  # Get up to 10 items

Blocking behavior

# Block indefinitely for an item
item = q.get(block=True)

# Block for up to 30 seconds
try:
    item = q.get(block=True, timeout=30)
except queue.Empty:
    print("Timeout reached, queue is empty")

# Non-blocking get, returns None if empty
item = q.get(block=False)
if item is None:
    print("Queue is empty")

Iterate over items

Iterate through items without removing them:
q.put(1)
q.put(2)
q.put(3)

# Read items immutably
assert [v for v in q.iterate()] == [1, 2, 3]
Control iteration timeout:
# Wait up to 10 seconds for next item
for item in q.iterate(item_poll_timeout=10.0):
    process(item)

Check queue length

length = q.len()
print(f"Queue has {length} items")

Clear queue

q.clear()  # Clear the default partition

Partitions

Partitions provide independent FIFO queues within the same Queue object.

Basic partition usage

with Queue.ephemeral() as q:
    # Put to different partitions
    q.put(0)  # Default partition
    q.put(1, partition="foo")
    q.put(2, partition="bar")

    # Get from specific partition (ignores other partitions)
    assert q.get(partition="bar") == 2
    assert q.get(partition="foo") == 1
    assert q.get() == 0  # Default partition

Partition TTL

Set custom expiration time for partitions:
# Default partition expires after 24 hours
q.put("data")

# Custom partition expires after 1 hour (3600 seconds)
q.put("data", partition="temp", partition_ttl=3600)

# Short-lived partition (10 seconds)
q.put("data", partition="cache", partition_ttl=10)

Partition operations

# Get length of specific partition
length = q.len(partition="foo")

# Get total length across all partitions
total_length = q.len(total=True)

# Clear specific partition
q.clear(partition="foo")

# Clear all partitions
q.clear(all=True)

Producer-consumer pattern

Use queues to coordinate work between functions:
import modal

app = modal.App()
queue = modal.Queue.from_name("work-queue", create_if_missing=True)

@app.function()
def producer():
    for i in range(100):
        queue.put({"task_id": i, "data": f"item-{i}"})
    print("Produced 100 tasks")

@app.function()
def consumer():
    while True:
        try:
            task = queue.get(block=True, timeout=5)
            # Process task
            print(f"Processed task {task['task_id']}")
        except queue.Empty:
            print("No more tasks, exiting")
            break

@app.local_entrypoint()
def main():
    producer.remote()
    # Spawn multiple consumers
    consumer.map([None] * 5)

Async usage

Use .aio suffix for async contexts:
import modal

app = modal.App()
q = modal.Queue.from_name("async-queue", create_if_missing=True)

@app.function()
async def async_producer():
    await q.put.aio("item")
    await q.put_many.aio([1, 2, 3])

@app.function()
async def async_consumer():
    item = await q.get.aio()
    items = await q.get_many.aio(10)

Managing queues

The Queue.objects namespace provides methods for managing named queues.

Create a queue

modal.Queue.objects.create("my-queue")
Create in a specific environment:
modal.Queue.objects.create("my-queue", environment_name="dev")
Allow creation if queue already exists:
modal.Queue.objects.create("my-queue", allow_existing=True)

List queues

List all queues in the active environment:
queues = modal.Queue.objects.list()
print([q.name for q in queues])
Limit results:
queues = modal.Queue.objects.list(max_objects=10, created_before="2025-01-01")

Delete a queue

await modal.Queue.objects.delete("my-queue")
This deletes an entire Queue and all its partitions. Deletion is irreversible and will affect any Apps currently using the Queue.

API reference

Queue methods

put
method
Add an object to the end of the queue.Parameters:
  • v (Any): Value to add
  • block (bool): If True, wait for space. Default: True
  • timeout (float, optional): Max seconds to wait if blocking
  • partition (str, optional): Partition key
  • partition_ttl (int): Seconds until partition expires. Default: 86400 (24 hours)
Raises: queue.Full if queue is full and can’t add item
put_many
method
Add several objects to the end of the queue.Parameters:
  • vs (list[Any]): Values to add
  • block (bool): If True, wait for space. Default: True
  • timeout (float, optional): Max seconds to wait if blocking
  • partition (str, optional): Partition key
  • partition_ttl (int): Seconds until partition expires. Default: 86400 (24 hours)
Raises: queue.Full if queue is full and can’t add items
get
method
Remove and return the next object in the queue.Parameters:
  • block (bool): If True, wait for an item. Default: True
  • timeout (float, optional): Max seconds to wait if blocking
  • partition (str, optional): Partition key
Returns: Any - The next item, or None if non-blocking and queue is emptyRaises: queue.Empty if timeout reached while blocking
get_many
method
Remove and return up to n_values objects from the queue.Parameters:
  • n_values (int): Maximum number of items to get
  • block (bool): If True, wait for at least one item. Default: True
  • timeout (float, optional): Max seconds to wait if blocking
  • partition (str, optional): Partition key
Returns: list[Any] - Up to n_values itemsRaises: queue.Empty if timeout reached while blocking
iterate
method
Iterate through items in the queue without mutation (read-only).Parameters:
  • partition (str, optional): Partition key
  • item_poll_timeout (float): Seconds to wait for next item. Default: 0.0
Returns: AsyncGenerator[Any, None]
len
method
Return the number of objects in the queue.Parameters:
  • partition (str, optional): Partition key
  • total (bool): If True, return total across all partitions. Default: False
Returns: int
clear
method
Clear the contents of a partition or all partitions.Parameters:
  • partition (str, optional): Partition key to clear
  • all (bool): If True, clear all partitions. Default: False
info
method
Return information about the Queue object. Returns a QueueInfo dataclass with name, created_at, and created_by fields.Returns: QueueInfo

Error handling

Queue full

import queue

try:
    q.put("item", block=False)
except queue.Full:
    print("Queue is at capacity")

Queue empty

import queue

try:
    item = q.get(block=True, timeout=5)
except queue.Empty:
    print("No items available within timeout")

Request size errors

from modal.exception import RequestSizeError

try:
    q.put(very_large_object)
except RequestSizeError:
    print("Object exceeds 1 MiB limit")

Best practices

  1. Set timeouts when using blocking operations to avoid indefinite waits
  2. Use partitions to organize different types of work or data streams
  3. Configure partition TTL based on your data retention needs
  4. Handle Empty and Full exceptions appropriately in your application logic
  5. Use .aio methods in async contexts to avoid blocking the event loop
  6. Batch with put_many/get_many for better performance when handling multiple items
  7. Don’t rely on queues for long-term storage - they’re designed for active communication, not persistence

Build docs developers (and LLMs) love