Skip to main content
Distributed, FIFO queue for data flow in Modal apps. The queue can contain any object serializable by cloudpickle, including Modal objects.

Lifetime

By default, each partition is cleared 24 hours after the last put operation. A lower TTL can be specified by the partition_ttl argument.

Limits

A single Queue can contain up to 100,000 partitions, each with up to 5,000 items. Each item can be up to 1 MiB.

Usage

from modal import Queue

# Create an ephemeral queue
with Queue.ephemeral() as my_queue:
    # Putting values
    my_queue.put("some value")
    my_queue.put(123)

    # Getting values
    assert my_queue.get() == "some value"
    assert my_queue.get() == 123

    # Using partitions
    my_queue.put(0)
    my_queue.put(1, partition="foo")
    my_queue.put(2, partition="bar")

    # Get from specific partition
    assert my_queue.get(partition="bar") == 2

# Persistent queues
queue = Queue.from_name("my-persisted-queue", create_if_missing=True)
queue.put(42)
assert queue.get() == 42

Factory methods

Queue.from_name

modal.Queue.from_name(
    name: str,
    *,
    environment_name: Optional[str] = None,
    create_if_missing: bool = False,
    client: Optional[Client] = None,
) -> Queue
Reference a named Queue, creating if necessary.
name
str
required
Name of the Queue.
environment_name
str
Environment to look up the Queue in.
create_if_missing
bool
default:"False"
Create the Queue if it doesn’t exist.
Example:
q = modal.Queue.from_name("my-queue", create_if_missing=True)
q.put(123)

Queue.ephemeral

modal.Queue.ephemeral(
    client: Optional[Client] = None,
    environment_name: Optional[str] = None,
) -> ContextManager[Queue]
Creates a new ephemeral queue within a context manager. Example:
with modal.Queue.ephemeral() as q:
    q.put(123)

Properties

queue.name

queue.name -> Optional[str]
Name of the Queue, if it has one.

queue.len

queue.len(
    partition: Optional[str] = None,
) -> int
Get the length of a queue partition.
partition
str
Partition to check. Defaults to default partition.
length
int
Number of items in the partition.

Methods

queue.put

queue.put(
    value: Any,
    *,
    partition: Optional[str] = None,
    partition_ttl: Optional[int] = None,
)
Put a value onto the queue.
value
Any
required
Value to put on the queue.
partition
str
Partition to put the value in. Defaults to default partition.
partition_ttl
int
TTL in seconds for the partition. Defaults to 24 hours.
Example:
queue.put(42)
queue.put("data", partition="my-partition")

queue.put_many

queue.put_many(
    values: Iterable[Any],
    *,
    partition: Optional[str] = None,
    partition_ttl: Optional[int] = None,
)
Put multiple values onto the queue.
values
Iterable[Any]
required
Values to put on the queue.
Example:
queue.put_many([1, 2, 3, 4, 5])

queue.get

queue.get(
    *,
    partition: Optional[str] = None,
    timeout: Optional[float] = None,
    block: bool = True,
) -> Any
Get a value from the queue.
partition
str
Partition to get from. Defaults to default partition.
timeout
float
Maximum time to wait in seconds.
block
bool
default:"True"
If True, block until item available. If False, raise queue.Empty if no item.
value
Any
The retrieved value.
Example:
value = queue.get()
value = queue.get(partition="my-partition", timeout=5.0)

queue.get_many

queue.get_many(
    n: int,
    *,
    partition: Optional[str] = None,
    timeout: Optional[float] = None,
) -> list[Any]
Get multiple values from the queue.
n
int
required
Maximum number of values to get.
values
list[Any]
List of retrieved values (may be less than n).

queue.iterate

queue.iterate(
    *,
    partition: Optional[str] = None,
) -> Iterator[Any]
Iterate through items in the queue without removing them.
partition
str
Partition to iterate. Defaults to default partition.
Example:
queue.put(1)
values = [v for v in queue.iterate()]

queue.clear

queue.clear(
    *,
    partition: Optional[str] = None,
)
Clear all items from a partition.
partition
str
Partition to clear. Defaults to default partition.

Manager methods

Queue.objects.create

modal.Queue.objects.create(
    name: str,
    *,
    allow_existing: bool = False,
    environment_name: Optional[str] = None,
    client: Optional[Client] = None,
) -> None
Create a new Queue object.

Queue.objects.list

modal.Queue.objects.list(
    *,
    max_objects: Optional[int] = None,
    created_before: Optional[Union[datetime, str]] = None,
    environment_name: str = "",
    client: Optional[Client] = None,
) -> list[Queue]
Return a list of hydrated Queue objects.

Queue.objects.delete

modal.Queue.objects.delete(
    name: str,
    *,
    allow_missing: bool = False,
    environment_name: Optional[str] = None,
    client: Optional[Client] = None,
)
Delete a named Queue.

Async methods

All methods have async equivalents with the .aio suffix:
  • queue.put.aio(value, partition=None, partition_ttl=None)
  • queue.get.aio(partition=None, timeout=None, block=True)
  • queue.len.aio(partition=None)
  • queue.clear.aio(partition=None)

Build docs developers (and LLMs) love