The Queue class provides durable workflow queuing with concurrency control, rate limiting, priority support, and partitioning capabilities.
Overview
Queues allow you to:
- Control how many workflows execute concurrently
- Rate limit workflow execution
- Prioritize workflows
- Partition workflows by key for ordered processing
- Defer workflow execution
Workflows enqueued on a queue are persisted to the database and survive application restarts.
Constructor
from dbos import Queue
queue = Queue(
name,
concurrency=None,
limiter=None,
worker_concurrency=None,
priority_enabled=False,
partition_queue=False,
polling_interval_sec=1.0
)
Unique name for the queue. Must be unique across the application.
Maximum number of workflows that can execute concurrently from this queue. If None, no limit is enforced.
limiter
QueueRateLimit
default:"None"
Rate limit configuration. Format: {"limit": int, "period": float}. Limits the number of workflows started per time period (in seconds).
Maximum concurrent workflows per worker process. Must be ≤ concurrency if both are set.
Enable priority-based dequeuing. Higher priority workflows execute first.
Enable queue partitioning. Workflows with the same partition key execute in order.
How often (in seconds) to check for new workflows to dequeue.
Example: Basic Queue
from dbos import DBOS, Queue
# Create a queue with max 5 concurrent workflows
processing_queue = Queue("order_processing", concurrency=5)
@DBOS.workflow()
def process_order(order_id: str) -> str:
DBOS.logger.info(f"Processing order {order_id}")
charge_customer(order_id)
ship_order(order_id)
return f"Completed {order_id}"
@DBOS.transaction()
def charge_customer(order_id: str) -> None:
# Database operations
pass
@DBOS.step()
def ship_order(order_id: str) -> None:
# External API call
pass
# Enqueue workflows
handle1 = processing_queue.enqueue(process_order, "order-123")
handle2 = processing_queue.enqueue(process_order, "order-456")
# Wait for results
result = handle1.get_result()
print(result) # "Completed order-123"
Methods
enqueue
handle = queue.enqueue(func, *args, **kwargs)
Enqueue a workflow for later execution.
A workflow function decorated with @DBOS.workflow()
Positional arguments to pass to the workflow
Keyword arguments to pass to the workflow
A handle to track the enqueued workflow
Example:
@DBOS.workflow()
def send_email(to: str, subject: str, body: str) -> bool:
# Send email logic
return True
email_queue = Queue("emails", concurrency=10)
handle = email_queue.enqueue(
send_email,
"[email protected]",
"Welcome!",
"Thanks for signing up"
)
result = handle.get_result()
enqueue_async
handle = await queue.enqueue_async(func, *args, **kwargs)
Async version of enqueue(). Must be called from an async context.
An async handle to track the enqueued workflow
Rate Limiting
Control how many workflows can start per time period:
from dbos import Queue
# Allow max 100 workflows per 60 seconds
api_queue = Queue(
"api_calls",
limiter={"limit": 100, "period": 60.0}
)
@DBOS.workflow()
def call_external_api(endpoint: str) -> dict:
# API call logic
pass
# These will be rate-limited
for i in range(200):
api_queue.enqueue(call_external_api, f"/endpoint/{i}")
Priority Queues
Process higher-priority workflows first:
from dbos import Queue, SetEnqueueOptions
# Enable priority on the queue
priority_queue = Queue("tasks", priority_enabled=True)
@DBOS.workflow()
def important_task(data: str) -> None:
# Task logic
pass
# Enqueue with different priorities (higher number = higher priority)
with SetEnqueueOptions(priority=10):
high_priority = priority_queue.enqueue(important_task, "urgent")
with SetEnqueueOptions(priority=1):
low_priority = priority_queue.enqueue(important_task, "normal")
# high_priority will execute before low_priority
Partitioned Queues
Ensure workflows with the same partition key execute in order:
from dbos import Queue, SetEnqueueOptions
# Enable partitioning
user_queue = Queue("user_operations", partition_queue=True)
@DBOS.workflow()
def update_user_data(user_id: str, data: dict) -> None:
# Update user logic
pass
# All workflows for the same user execute in order
for i in range(10):
with SetEnqueueOptions(queue_partition_key="user-123"):
user_queue.enqueue(update_user_data, "user-123", {"update": i})
with SetEnqueueOptions(queue_partition_key="user-456"):
user_queue.enqueue(update_user_data, "user-456", {"update": i})
# user-123 updates execute in order
# user-456 updates execute in order
# But user-123 and user-456 can execute in parallel
Deduplication
Prevent duplicate workflow executions:
from dbos import Queue, SetEnqueueOptions
import uuid
queue = Queue("processing")
@DBOS.workflow()
def process_payment(payment_id: str) -> None:
# Process payment
pass
payment_id = "payment-123"
dedup_key = f"payment-{payment_id}"
# First enqueue succeeds
with SetEnqueueOptions(deduplication_id=dedup_key):
handle1 = queue.enqueue(process_payment, payment_id)
# Second enqueue with same key is ignored
with SetEnqueueOptions(deduplication_id=dedup_key):
handle2 = queue.enqueue(process_payment, payment_id)
# Both handles point to the same workflow execution
assert handle1.workflow_id == handle2.workflow_id
Combined Example
from dbos import DBOS, Queue, SetEnqueueOptions
# Create a queue with multiple features
order_queue = Queue(
name="order_processing",
concurrency=10, # Max 10 concurrent workflows
limiter={"limit": 100, "period": 60}, # Max 100/minute
priority_enabled=True, # Support priorities
partition_queue=True, # Support partitioning
)
@DBOS.workflow()
def process_order(order_id: str, customer_id: str) -> str:
# Process the order
return f"Processed {order_id}"
# Enqueue a high-priority order for a specific customer
with SetEnqueueOptions(
priority=10,
queue_partition_key=f"customer-{customer_id}",
deduplication_id=f"order-{order_id}"
):
handle = order_queue.enqueue(
process_order,
order_id="order-789",
customer_id="customer-123"
)
result = handle.get_result()
Listening to Specific Queues
By default, DBOS processes workflows from all declared queues. You can configure a DBOS instance to only listen to specific queues:
from dbos import DBOS, Queue
queue1 = Queue("high_priority")
queue2 = Queue("low_priority")
queue3 = Queue("batch_jobs")
# Only process workflows from high_priority and batch_jobs
DBOS.listen_queues([queue1, queue3])
# Must be called before DBOS.launch()
DBOS.launch()
QueueRateLimit Type
from typing import TypedDict
class QueueRateLimit(TypedDict):
limit: int # Maximum number of workflows
period: float # Time period in seconds