Distributed, FIFO queue for data flow in Modal apps.
The queue can contain any object serializable by cloudpickle, including Modal objects.
Lifetime
By default, each partition is cleared 24 hours after the last put operation. A lower TTL can be specified by the partition_ttl argument.
Limits
A single Queue can contain up to 100,000 partitions, each with up to 5,000 items. Each item can be up to 1 MiB.
Usage
from modal import Queue
# Create an ephemeral queue
with Queue.ephemeral() as my_queue:
# Putting values
my_queue.put("some value")
my_queue.put(123)
# Getting values
assert my_queue.get() == "some value"
assert my_queue.get() == 123
# Using partitions
my_queue.put(0)
my_queue.put(1, partition="foo")
my_queue.put(2, partition="bar")
# Get from specific partition
assert my_queue.get(partition="bar") == 2
# Persistent queues
queue = Queue.from_name("my-persisted-queue", create_if_missing=True)
queue.put(42)
assert queue.get() == 42
Factory methods
Queue.from_name
modal.Queue.from_name(
name: str,
*,
environment_name: Optional[str] = None,
create_if_missing: bool = False,
client: Optional[Client] = None,
) -> Queue
Reference a named Queue, creating if necessary.
Environment to look up the Queue in.
Create the Queue if it doesn’t exist.
Example:
q = modal.Queue.from_name("my-queue", create_if_missing=True)
q.put(123)
Queue.ephemeral
modal.Queue.ephemeral(
client: Optional[Client] = None,
environment_name: Optional[str] = None,
) -> ContextManager[Queue]
Creates a new ephemeral queue within a context manager.
Example:
with modal.Queue.ephemeral() as q:
q.put(123)
Properties
queue.name
queue.name -> Optional[str]
Name of the Queue, if it has one.
queue.len
queue.len(
partition: Optional[str] = None,
) -> int
Get the length of a queue partition.
Partition to check. Defaults to default partition.
Number of items in the partition.
Methods
queue.put
queue.put(
value: Any,
*,
partition: Optional[str] = None,
partition_ttl: Optional[int] = None,
)
Put a value onto the queue.
Value to put on the queue.
Partition to put the value in. Defaults to default partition.
TTL in seconds for the partition. Defaults to 24 hours.
Example:
queue.put(42)
queue.put("data", partition="my-partition")
queue.put_many
queue.put_many(
values: Iterable[Any],
*,
partition: Optional[str] = None,
partition_ttl: Optional[int] = None,
)
Put multiple values onto the queue.
Values to put on the queue.
Example:
queue.put_many([1, 2, 3, 4, 5])
queue.get
queue.get(
*,
partition: Optional[str] = None,
timeout: Optional[float] = None,
block: bool = True,
) -> Any
Get a value from the queue.
Partition to get from. Defaults to default partition.
Maximum time to wait in seconds.
If True, block until item available. If False, raise queue.Empty if no item.
Example:
value = queue.get()
value = queue.get(partition="my-partition", timeout=5.0)
queue.get_many
queue.get_many(
n: int,
*,
partition: Optional[str] = None,
timeout: Optional[float] = None,
) -> list[Any]
Get multiple values from the queue.
Maximum number of values to get.
List of retrieved values (may be less than n).
queue.iterate
queue.iterate(
*,
partition: Optional[str] = None,
) -> Iterator[Any]
Iterate through items in the queue without removing them.
Partition to iterate. Defaults to default partition.
Example:
queue.put(1)
values = [v for v in queue.iterate()]
queue.clear
queue.clear(
*,
partition: Optional[str] = None,
)
Clear all items from a partition.
Partition to clear. Defaults to default partition.
Manager methods
Queue.objects.create
modal.Queue.objects.create(
name: str,
*,
allow_existing: bool = False,
environment_name: Optional[str] = None,
client: Optional[Client] = None,
) -> None
Create a new Queue object.
Queue.objects.list
modal.Queue.objects.list(
*,
max_objects: Optional[int] = None,
created_before: Optional[Union[datetime, str]] = None,
environment_name: str = "",
client: Optional[Client] = None,
) -> list[Queue]
Return a list of hydrated Queue objects.
Queue.objects.delete
modal.Queue.objects.delete(
name: str,
*,
allow_missing: bool = False,
environment_name: Optional[str] = None,
client: Optional[Client] = None,
)
Delete a named Queue.
Async methods
All methods have async equivalents with the .aio suffix:
queue.put.aio(value, partition=None, partition_ttl=None)
queue.get.aio(partition=None, timeout=None, block=True)
queue.len.aio(partition=None)
queue.clear.aio(partition=None)