DBOS queues allow you to enqueue workflows for later execution with precise control over concurrency and rate limiting. Queued workflows are durably persisted and automatically dequeued by worker processes.
from dbos import SetWorkflowID# For idempotency, set a specific workflow IDwith SetWorkflowID(f"email-{user_id}"): handle = email_queue.enqueue( send_email, user_email, "Welcome!", "Thanks for signing up" )# Running again with same ID returns existing workflowwith SetWorkflowID(f"email-{user_id}"): handle = email_queue.enqueue( send_email, user_email, "Welcome!", "Thanks for signing up" )# Returns immediately with cached result
import asyncioasync def enqueue_multiple_emails(): handles = [] for user in users: handle = await email_queue.enqueue_async( send_email, user.email, "Newsletter", "Check out our latest updates" ) handles.append(handle) # Wait for all to complete results = [await h.get_result() for h in handles] return resultsresults = asyncio.run(enqueue_multiple_emails())
Limit the number of workflows executing simultaneously:
# At most 3 workflows from this queue can run at onceapi_queue = Queue("external-api", concurrency=3)@DBOS.workflow()def call_external_api(endpoint: str, data: dict) -> dict: response = external_api.post(endpoint, json=data) return response.json()# Enqueue 10 workflowshandles = []for i in range(10): handle = api_queue.enqueue( call_external_api, "/api/process", {"id": i} ) handles.append(handle)# Only 3 will run concurrently, rest will wait in queue
The concurrency limit applies globally across all workers listening to the queue.
# Max 5 workflows per 10 secondsthrottled_queue = Queue( "rate-limited-api", concurrency=2, limiter={"limit": 5, "period": 10})@DBOS.workflow()def rate_limited_operation(data: str) -> str: # This respects the rate limit return process_data(data)# Even if we enqueue 100 workflows rapidly...for i in range(100): throttled_queue.enqueue(rate_limited_operation, f"data-{i}")# ...they will start at most 5 per 10 seconds
from dbos import SetEnqueueOptions# Enable priority for this queuepriority_queue = Queue("priority-tasks", priority_enabled=True)@DBOS.workflow()def process_task(task_id: str) -> str: return f"Processed {task_id}"# Enqueue with different priorities (higher = more urgent)with SetEnqueueOptions(priority=100): high_priority = priority_queue.enqueue(process_task, "urgent-task")with SetEnqueueOptions(priority=50): medium_priority = priority_queue.enqueue(process_task, "normal-task")with SetEnqueueOptions(priority=10): low_priority = priority_queue.enqueue(process_task, "background-task")# High priority task will be dequeued first
Priority must be between 1 and 2,147,483,647. Attempting to use priority on a non-priority-enabled queue will raise an exception.
Partition queues for ordered processing within partitions:
from dbos import SetEnqueueOptions# Enable partitioningpartitioned_queue = Queue("user-updates", partition_queue=True)@DBOS.workflow()def update_user_data(user_id: str, updates: dict) -> None: # Process updates for a specific user apply_updates(user_id, updates)# All workflows for the same user are processed in orderwith SetEnqueueOptions(queue_partition_key="user-123"): partitioned_queue.enqueue(update_user_data, "user-123", {"name": "Alice"})with SetEnqueueOptions(queue_partition_key="user-123"): partitioned_queue.enqueue(update_user_data, "user-123", {"email": "[email protected]"})with SetEnqueueOptions(queue_partition_key="user-456"): partitioned_queue.enqueue(update_user_data, "user-456", {"name": "Bob"})# user-123 updates process sequentially# user-456 updates can run concurrently with user-123
Partitioned queues ensure workflows within the same partition execute sequentially, while different partitions can run concurrently.
Prevent duplicate workflows using deduplication IDs:
from dbos import SetEnqueueOptionsfrom dbos.error import DBOSQueueDeduplicatedErrorqueue = Queue("deduplicated-tasks")@DBOS.workflow()def process_payment(transaction_id: str, amount: float) -> dict: # Process payment return {"transaction_id": transaction_id, "status": "completed"}# First enqueue with deduplication IDwith SetEnqueueOptions(deduplication_id="txn-12345"): handle1 = queue.enqueue(process_payment, "txn-12345", 99.99)# Attempting to enqueue with same deduplication ID raises exceptiontry: with SetEnqueueOptions(deduplication_id="txn-12345"): handle2 = queue.enqueue(process_payment, "txn-12345", 99.99)except DBOSQueueDeduplicatedError as e: print(f"Workflow deduplicated: {e.workflow_id}") # Get the original workflow instead handle2 = DBOS.retrieve_workflow(e.workflow_id)
Enqueue child workflows from within a parent workflow:
queue = Queue("child-queue", concurrency=3)@DBOS.workflow()def child_workflow(item_id: str) -> str: DBOS.recv("release", timeout_seconds=30) return f"Processed {item_id}"@DBOS.workflow()def parent_workflow(item_ids: list[str]) -> list[str]: # Enqueue multiple child workflows handles = [] for item_id in item_ids: handle = queue.enqueue(child_workflow, item_id) handles.append(handle) # Wait a bit and check status DBOS.sleep(1) for handle in handles[:3]: status = handle.get_status() print(f"Status: {status.status}") # Some may be "ENQUEUED" # Release all child workflows for handle in handles: DBOS.send(handle.get_workflow_id(), "go", "release") # Collect results results = [handle.get_result() for handle in handles] return results
Customize how often the queue checks for new work:
# Poll every 0.5 seconds (default is 1.0 second)fast_queue = Queue( "fast-polling", polling_interval_sec=0.5)# Poll every 5 seconds for less frequent workslow_queue = Queue( "slow-polling", polling_interval_sec=5.0)
Lower polling intervals reduce latency but increase database load. Adjust based on your workload characteristics.