Skip to main content

Introduction

DBOS queues allow you to enqueue workflows for later execution with precise control over concurrency and rate limiting. Queued workflows are durably persisted and automatically dequeued by worker processes.

Creating a Queue

Define a queue with optional concurrency and rate limiting:
from dbos import Queue

# Basic queue - unlimited concurrency
basic_queue = Queue("email-notifications")

# Queue with concurrency limit - max 5 concurrent workflows
limited_queue = Queue("api-calls", concurrency=5)

# Queue with rate limiting - max 10 workflows per 60 seconds
rate_limited_queue = Queue(
    "third-party-api",
    concurrency=3,
    limiter={"limit": 10, "period": 60}
)
Each queue must have a unique name. Attempting to create a queue with a duplicate name will raise an exception.

Enqueuing Workflows

@DBOS.workflow()
def send_email(recipient: str, subject: str, body: str) -> dict:
    # Send email logic
    return {"status": "sent", "recipient": recipient}

# Create queue
email_queue = Queue("emails", concurrency=10)

# Enqueue workflow
handle = email_queue.enqueue(
    send_email,
    "[email protected]",
    "Welcome!",
    "Thanks for signing up"
)

# Get result when ready
result = handle.get_result()
print(result)  # {"status": "sent", "recipient": "[email protected]"}

Concurrency Control

Limit the number of workflows executing simultaneously:
# At most 3 workflows from this queue can run at once
api_queue = Queue("external-api", concurrency=3)

@DBOS.workflow()
def call_external_api(endpoint: str, data: dict) -> dict:
    response = external_api.post(endpoint, json=data)
    return response.json()

# Enqueue 10 workflows
handles = []
for i in range(10):
    handle = api_queue.enqueue(
        call_external_api,
        "/api/process",
        {"id": i}
    )
    handles.append(handle)

# Only 3 will run concurrently, rest will wait in queue
The concurrency limit applies globally across all workers listening to the queue.

Rate Limiting

Control the rate at which workflows are started:
# Max 5 workflows per 10 seconds
throttled_queue = Queue(
    "rate-limited-api",
    concurrency=2,
    limiter={"limit": 5, "period": 10}
)

@DBOS.workflow()
def rate_limited_operation(data: str) -> str:
    # This respects the rate limit
    return process_data(data)

# Even if we enqueue 100 workflows rapidly...
for i in range(100):
    throttled_queue.enqueue(rate_limited_operation, f"data-{i}")

# ...they will start at most 5 per 10 seconds

Worker Concurrency

Limit concurrency per worker instance:
# Global concurrency: 10, per-worker concurrency: 3
queue = Queue(
    "distributed-work",
    concurrency=10,
    worker_concurrency=3
)
Use worker_concurrency in distributed deployments where you want each worker to process a subset of the total concurrency.

Priority Queues

Enable priority-based dequeuing:
from dbos import SetEnqueueOptions

# Enable priority for this queue
priority_queue = Queue("priority-tasks", priority_enabled=True)

@DBOS.workflow()
def process_task(task_id: str) -> str:
    return f"Processed {task_id}"

# Enqueue with different priorities (higher = more urgent)
with SetEnqueueOptions(priority=100):
    high_priority = priority_queue.enqueue(process_task, "urgent-task")

with SetEnqueueOptions(priority=50):
    medium_priority = priority_queue.enqueue(process_task, "normal-task")

with SetEnqueueOptions(priority=10):
    low_priority = priority_queue.enqueue(process_task, "background-task")

# High priority task will be dequeued first
Priority must be between 1 and 2,147,483,647. Attempting to use priority on a non-priority-enabled queue will raise an exception.

Partitioned Queues

Partition queues for ordered processing within partitions:
from dbos import SetEnqueueOptions

# Enable partitioning
partitioned_queue = Queue("user-updates", partition_queue=True)

@DBOS.workflow()
def update_user_data(user_id: str, updates: dict) -> None:
    # Process updates for a specific user
    apply_updates(user_id, updates)

# All workflows for the same user are processed in order
with SetEnqueueOptions(queue_partition_key="user-123"):
    partitioned_queue.enqueue(update_user_data, "user-123", {"name": "Alice"})

with SetEnqueueOptions(queue_partition_key="user-123"):
    partitioned_queue.enqueue(update_user_data, "user-123", {"email": "[email protected]"})

with SetEnqueueOptions(queue_partition_key="user-456"):
    partitioned_queue.enqueue(update_user_data, "user-456", {"name": "Bob"})

# user-123 updates process sequentially
# user-456 updates can run concurrently with user-123
Partitioned queues ensure workflows within the same partition execute sequentially, while different partitions can run concurrently.

Deduplication

Prevent duplicate workflows using deduplication IDs:
from dbos import SetEnqueueOptions
from dbos.error import DBOSQueueDeduplicatedError

queue = Queue("deduplicated-tasks")

@DBOS.workflow()
def process_payment(transaction_id: str, amount: float) -> dict:
    # Process payment
    return {"transaction_id": transaction_id, "status": "completed"}

# First enqueue with deduplication ID
with SetEnqueueOptions(deduplication_id="txn-12345"):
    handle1 = queue.enqueue(process_payment, "txn-12345", 99.99)

# Attempting to enqueue with same deduplication ID raises exception
try:
    with SetEnqueueOptions(deduplication_id="txn-12345"):
        handle2 = queue.enqueue(process_payment, "txn-12345", 99.99)
except DBOSQueueDeduplicatedError as e:
    print(f"Workflow deduplicated: {e.workflow_id}")
    # Get the original workflow instead
    handle2 = DBOS.retrieve_workflow(e.workflow_id)

Child Workflow Queueing

Enqueue child workflows from within a parent workflow:
queue = Queue("child-queue", concurrency=3)

@DBOS.workflow()
def child_workflow(item_id: str) -> str:
    DBOS.recv("release", timeout_seconds=30)
    return f"Processed {item_id}"

@DBOS.workflow()
def parent_workflow(item_ids: list[str]) -> list[str]:
    # Enqueue multiple child workflows
    handles = []
    for item_id in item_ids:
        handle = queue.enqueue(child_workflow, item_id)
        handles.append(handle)
    
    # Wait a bit and check status
    DBOS.sleep(1)
    for handle in handles[:3]:
        status = handle.get_status()
        print(f"Status: {status.status}")  # Some may be "ENQUEUED"
    
    # Release all child workflows
    for handle in handles:
        DBOS.send(handle.get_workflow_id(), "go", "release")
    
    # Collect results
    results = [handle.get_result() for handle in handles]
    return results

Monitoring Queue Status

Check workflow status in the queue:
# Enqueue a workflow
handle = queue.enqueue(process_task, "task-1")

# Get status
status = handle.get_status()

print(f"Status: {status.status}")  # "ENQUEUED", "PENDING", "SUCCESS", etc.
print(f"Queue: {status.queue_name}")  # "my-queue"
print(f"Created: {status.created_at}")
print(f"Dequeued: {status.dequeued_at}")  # None if still queued

# Wait for dequeue
while status.status == "ENQUEUED":
    time.sleep(1)
    status = handle.get_status()

print(f"Workflow dequeued at {status.dequeued_at}")

Polling Intervals

Customize how often the queue checks for new work:
# Poll every 0.5 seconds (default is 1.0 second)
fast_queue = Queue(
    "fast-polling",
    polling_interval_sec=0.5
)

# Poll every 5 seconds for less frequent work
slow_queue = Queue(
    "slow-polling",
    polling_interval_sec=5.0
)
Lower polling intervals reduce latency but increase database load. Adjust based on your workload characteristics.

Complete Example: Job Processing System

from dbos import DBOS, Queue, SetEnqueueOptions, SetWorkflowID
import sqlalchemy as sa
from datetime import datetime

# Define queues with different characteristics
high_priority_queue = Queue(
    "high-priority",
    concurrency=10,
    priority_enabled=True
)

bulk_processing_queue = Queue(
    "bulk-processing",
    concurrency=50,
    limiter={"limit": 100, "period": 60}  # Max 100/minute
)

user_specific_queue = Queue(
    "user-operations",
    concurrency=20,
    partition_queue=True
)

@DBOS.transaction()
def save_job_result(job_id: str, result: dict, status: str) -> None:
    """Save job result to database."""
    DBOS.sql_session.execute(
        sa.text(
            "INSERT INTO job_results (job_id, result, status, completed_at) "
            "VALUES (:job_id, :result, :status, :completed_at)"
        ),
        {
            "job_id": job_id,
            "result": str(result),
            "status": status,
            "completed_at": datetime.now()
        }
    )

@DBOS.step()
def process_job_data(job_type: str, data: dict) -> dict:
    """Process job based on type."""
    if job_type == "email":
        return send_email_batch(data)
    elif job_type == "report":
        return generate_report(data)
    elif job_type == "export":
        return export_data(data)
    else:
        raise ValueError(f"Unknown job type: {job_type}")

@DBOS.workflow()
def job_processor(job_id: str, job_type: str, data: dict) -> dict:
    """Process a job and save results."""
    DBOS.logger.info(f"Starting job {job_id} of type {job_type}")
    
    try:
        # Process the job
        result = process_job_data(job_type, data)
        
        # Save successful result
        save_job_result(job_id, result, "completed")
        
        return {"job_id": job_id, "status": "completed", "result": result}
    
    except Exception as e:
        # Save error
        save_job_result(job_id, {"error": str(e)}, "failed")
        raise

def submit_job(job_id: str, job_type: str, data: dict, priority: int = 50, user_id: str = None) -> str:
    """Submit a job for processing."""
    
    # Choose queue based on priority
    if priority > 75:
        queue = high_priority_queue
        with SetEnqueueOptions(priority=priority):
            with SetWorkflowID(f"job-{job_id}"):
                handle = queue.enqueue(job_processor, job_id, job_type, data)
    
    elif user_id:
        # User-specific operations with ordering
        queue = user_specific_queue
        with SetEnqueueOptions(queue_partition_key=user_id):
            with SetWorkflowID(f"job-{job_id}"):
                handle = queue.enqueue(job_processor, job_id, job_type, data)
    
    else:
        # Bulk processing
        queue = bulk_processing_queue
        with SetWorkflowID(f"job-{job_id}"):
            handle = queue.enqueue(job_processor, job_id, job_type, data)
    
    return handle.get_workflow_id()

# Usage examples

# Submit high-priority job
job1_id = submit_job(
    "urgent-123",
    "email",
    {"recipients": ["[email protected]"], "template": "alert"},
    priority=90
)

# Submit bulk jobs
for i in range(100):
    submit_job(
        f"bulk-{i}",
        "report",
        {"report_type": "daily", "user_id": i}
    )

# Submit user-specific job (will be ordered per user)
submit_job(
    "user-update-1",
    "export",
    {"format": "csv"},
    user_id="user-456"
)

Best Practices

  • Start conservative (low concurrency) and increase based on monitoring
  • Consider downstream system capacity (databases, APIs, etc.)
  • Use worker_concurrency in multi-instance deployments
  • Set limits based on third-party API quotas
  • Leave headroom (e.g., if API allows 100/min, set limit to 80/min)
  • Use separate queues for different rate-limited resources
  • User-facing operations vs. background tasks
  • Time-sensitive workflows (e.g., payment processing)
  • SLA-based job processing
  • User-specific operations that must be sequential
  • Resource locking (e.g., bank account updates)
  • Ordered event processing per entity

Next Steps

Scheduled Workflows

Schedule workflows with cron expressions

Workflow Management

Programmatically manage and monitor workflows

Error Handling

Handle errors and implement retry strategies

Configuration

Configure queue polling and other settings

Build docs developers (and LLMs) love