Skip to main content
The Queue class provides durable workflow queuing with concurrency control, rate limiting, priority support, and partitioning capabilities.

Overview

Queues allow you to:
  • Control how many workflows execute concurrently
  • Rate limit workflow execution
  • Prioritize workflows
  • Partition workflows by key for ordered processing
  • Defer workflow execution
Workflows enqueued on a queue are persisted to the database and survive application restarts.

Constructor

from dbos import Queue

queue = Queue(
    name,
    concurrency=None,
    limiter=None,
    worker_concurrency=None,
    priority_enabled=False,
    partition_queue=False,
    polling_interval_sec=1.0
)
name
str
required
Unique name for the queue. Must be unique across the application.
concurrency
int
default:"None"
Maximum number of workflows that can execute concurrently from this queue. If None, no limit is enforced.
limiter
QueueRateLimit
default:"None"
Rate limit configuration. Format: {"limit": int, "period": float}. Limits the number of workflows started per time period (in seconds).
worker_concurrency
int
default:"None"
Maximum concurrent workflows per worker process. Must be ≤ concurrency if both are set.
priority_enabled
bool
default:"False"
Enable priority-based dequeuing. Higher priority workflows execute first.
partition_queue
bool
default:"False"
Enable queue partitioning. Workflows with the same partition key execute in order.
polling_interval_sec
float
default:"1.0"
How often (in seconds) to check for new workflows to dequeue.

Example: Basic Queue

from dbos import DBOS, Queue

# Create a queue with max 5 concurrent workflows
processing_queue = Queue("order_processing", concurrency=5)

@DBOS.workflow()
def process_order(order_id: str) -> str:
    DBOS.logger.info(f"Processing order {order_id}")
    charge_customer(order_id)
    ship_order(order_id)
    return f"Completed {order_id}"

@DBOS.transaction()
def charge_customer(order_id: str) -> None:
    # Database operations
    pass

@DBOS.step()
def ship_order(order_id: str) -> None:
    # External API call
    pass

# Enqueue workflows
handle1 = processing_queue.enqueue(process_order, "order-123")
handle2 = processing_queue.enqueue(process_order, "order-456")

# Wait for results
result = handle1.get_result()
print(result)  # "Completed order-123"

Methods

enqueue

handle = queue.enqueue(func, *args, **kwargs)
Enqueue a workflow for later execution.
func
Callable[P, R]
required
A workflow function decorated with @DBOS.workflow()
args
P.args
Positional arguments to pass to the workflow
kwargs
P.kwargs
Keyword arguments to pass to the workflow
WorkflowHandle[R]
WorkflowHandle[R]
A handle to track the enqueued workflow
Example:
@DBOS.workflow()
def send_email(to: str, subject: str, body: str) -> bool:
    # Send email logic
    return True

email_queue = Queue("emails", concurrency=10)
handle = email_queue.enqueue(
    send_email,
    "[email protected]",
    "Welcome!",
    "Thanks for signing up"
)
result = handle.get_result()

enqueue_async

handle = await queue.enqueue_async(func, *args, **kwargs)
Async version of enqueue(). Must be called from an async context.
WorkflowHandleAsync[R]
WorkflowHandleAsync[R]
An async handle to track the enqueued workflow

Rate Limiting

Control how many workflows can start per time period:
from dbos import Queue

# Allow max 100 workflows per 60 seconds
api_queue = Queue(
    "api_calls",
    limiter={"limit": 100, "period": 60.0}
)

@DBOS.workflow()
def call_external_api(endpoint: str) -> dict:
    # API call logic
    pass

# These will be rate-limited
for i in range(200):
    api_queue.enqueue(call_external_api, f"/endpoint/{i}")

Priority Queues

Process higher-priority workflows first:
from dbos import Queue, SetEnqueueOptions

# Enable priority on the queue
priority_queue = Queue("tasks", priority_enabled=True)

@DBOS.workflow()
def important_task(data: str) -> None:
    # Task logic
    pass

# Enqueue with different priorities (higher number = higher priority)
with SetEnqueueOptions(priority=10):
    high_priority = priority_queue.enqueue(important_task, "urgent")

with SetEnqueueOptions(priority=1):
    low_priority = priority_queue.enqueue(important_task, "normal")

# high_priority will execute before low_priority

Partitioned Queues

Ensure workflows with the same partition key execute in order:
from dbos import Queue, SetEnqueueOptions

# Enable partitioning
user_queue = Queue("user_operations", partition_queue=True)

@DBOS.workflow()
def update_user_data(user_id: str, data: dict) -> None:
    # Update user logic
    pass

# All workflows for the same user execute in order
for i in range(10):
    with SetEnqueueOptions(queue_partition_key="user-123"):
        user_queue.enqueue(update_user_data, "user-123", {"update": i})
    
    with SetEnqueueOptions(queue_partition_key="user-456"):
        user_queue.enqueue(update_user_data, "user-456", {"update": i})

# user-123 updates execute in order
# user-456 updates execute in order
# But user-123 and user-456 can execute in parallel

Deduplication

Prevent duplicate workflow executions:
from dbos import Queue, SetEnqueueOptions
import uuid

queue = Queue("processing")

@DBOS.workflow()
def process_payment(payment_id: str) -> None:
    # Process payment
    pass

payment_id = "payment-123"
dedup_key = f"payment-{payment_id}"

# First enqueue succeeds
with SetEnqueueOptions(deduplication_id=dedup_key):
    handle1 = queue.enqueue(process_payment, payment_id)

# Second enqueue with same key is ignored
with SetEnqueueOptions(deduplication_id=dedup_key):
    handle2 = queue.enqueue(process_payment, payment_id)

# Both handles point to the same workflow execution
assert handle1.workflow_id == handle2.workflow_id

Combined Example

from dbos import DBOS, Queue, SetEnqueueOptions

# Create a queue with multiple features
order_queue = Queue(
    name="order_processing",
    concurrency=10,  # Max 10 concurrent workflows
    limiter={"limit": 100, "period": 60},  # Max 100/minute
    priority_enabled=True,  # Support priorities
    partition_queue=True,  # Support partitioning
)

@DBOS.workflow()
def process_order(order_id: str, customer_id: str) -> str:
    # Process the order
    return f"Processed {order_id}"

# Enqueue a high-priority order for a specific customer
with SetEnqueueOptions(
    priority=10,
    queue_partition_key=f"customer-{customer_id}",
    deduplication_id=f"order-{order_id}"
):
    handle = order_queue.enqueue(
        process_order,
        order_id="order-789",
        customer_id="customer-123"
    )

result = handle.get_result()

Listening to Specific Queues

By default, DBOS processes workflows from all declared queues. You can configure a DBOS instance to only listen to specific queues:
from dbos import DBOS, Queue

queue1 = Queue("high_priority")
queue2 = Queue("low_priority")
queue3 = Queue("batch_jobs")

# Only process workflows from high_priority and batch_jobs
DBOS.listen_queues([queue1, queue3])

# Must be called before DBOS.launch()
DBOS.launch()

QueueRateLimit Type

from typing import TypedDict

class QueueRateLimit(TypedDict):
    limit: int      # Maximum number of workflows
    period: float   # Time period in seconds

Build docs developers (and LLMs) love