Skip to main content
Async Workflow Queues enable asynchronous workflow starts via Kafka, providing rate limiting, buffering, and decoupling of workflow initiation from execution.

Overview

Async Workflow Queues provide:
  • Rate Limiting: Control workflow start rate per domain
  • Buffering: Queue workflow starts during high load
  • Decoupling: Separate request acceptance from execution
  • Backpressure: Prevent overload from workflow start storms
  • Multi-Tenancy: Per-domain queue configuration

Architecture

Client → Frontend → Kafka Topic → Async Worker → History Service
                     (Queue)      (Consumer)
  1. Client submits workflow start request
  2. Frontend validates and publishes to Kafka
  3. Request acknowledged immediately
  4. Async worker consumes from Kafka
  5. Worker starts workflow via normal path
  6. Rate limiting and retries handled by worker

Setup

Prerequisites

  • Kafka cluster (version 2.0+)
  • Cadence server with async workflow feature enabled
  • Worker service for consuming queue

Kafka Configuration

Create Kafka topics for async workflow queue:
# Create topic
kafka-topics.sh --create \
  --bootstrap-server kafka:9092 \
  --topic cadence-async-workflow-queue \
  --partitions 10 \
  --replication-factor 3 \
  --config retention.ms=86400000 \
  --config max.message.bytes=4194304

# Verify topic
kafka-topics.sh --describe \
  --bootstrap-server kafka:9092 \
  --topic cadence-async-workflow-queue
Partitions: More partitions = higher throughput, but more consumers needed Retention: How long requests stay in queue (24h = 86400000ms)

Server Configuration

Enable async workflow queues in config.yaml:
publicclient:
  hostPort: "127.0.0.1:7833"

kafka:
  clusters:
    test:
      brokers:
        - kafka1.example.com:9092
        - kafka2.example.com:9092
        - kafka3.example.com:9092
  topics:
    cadence-async-workflow-queue:
      cluster: test
  applications:
    async-wf-consumer:
      topic: cadence-async-workflow-queue
      cluster: test
      dlq-topic: cadence-async-workflow-dlq

services:
  worker:
    asyncWorkflowQueueProvider:
      enabled: true
      kafkaConfig:
        cluster: test
        topic: cadence-async-workflow-queue
        consumerGroup: cadence-async-wf-consumer
        concurrency: 100

Domain Configuration

Enable async workflow queue per domain:
# Get current configuration
cadence admin domain get-async-wf-config --domain my-domain

# Update configuration
cadence admin domain update-async-wf-config \
  --domain my-domain \
  --json '{
    "enabled": true,
    "predefinedQueueName": "cadence-async-workflow-queue",
    "queueType": "kafka",
    "queueConfig": {
      "encodingType": "JSON",
      "version": 0
    }
  }'
Configuration Fields:
  • enabled: Enable/disable async queue for domain
  • predefinedQueueName: Kafka topic name
  • queueType: Queue backend (currently only “kafka”)
  • queueConfig: Queue-specific configuration

Worker Service Setup

Async workers are part of the Cadence worker service:
# Start worker service with async queue consumer
./cadence-server --zone async_wf_kafka_queue start
Or via Docker:
services:
  cadence-worker:
    image: ubercadence/server:master-auto-setup
    command:
      - /bin/sh
      - -c
      - |
        /cadence-server --zone async_wf_kafka_queue start
    environment:
      - KAFKA_SEEDS=kafka:9092
    depends_on:
      - kafka

Usage

Starting Async Workflows

Go SDK:
import (
    "go.uber.org/cadence/client"
)

func startAsyncWorkflow(c client.Client) error {
    options := client.StartWorkflowOptions{
        ID:                           "workflow-1",
        TaskList:                     "my-task-list",
        ExecutionStartToCloseTimeout: time.Hour,
    }
    
    // Start asynchronously
    // Returns immediately after queueing to Kafka
    we, err := c.StartWorkflow(context.Background(), options, MyWorkflow)
    if err != nil {
        return err
    }
    
    // WorkflowExecution returned immediately
    // Actual workflow start happens asynchronously
    fmt.Printf("Queued workflow: %s\n", we.ID)
    
    return nil
}
Java SDK:
WorkflowOptions options = WorkflowOptions.newBuilder()
    .setWorkflowId("workflow-1")
    .setTaskList("my-task-list")
    .setExecutionStartToCloseTimeout(Duration.ofHours(1))
    .build();

MyWorkflow workflow = client.newWorkflowStub(MyWorkflow.class, options);

// Start asynchronously
WorkflowExecution execution = WorkflowClient.start(workflow::processOrder, order);

// Returns immediately after queueing
System.out.println("Queued workflow: " + execution.getWorkflowId());
CLI:
# Start workflow (async if domain configured)
cadence --do my-domain workflow start \
  --tl my-task-list \
  --wt MyWorkflow \
  --et 3600 \
  --input '{"orderId": "12345"}'
No special flags needed - async behavior automatic if domain is configured.

Checking Workflow Status

Workflow may not start immediately:
# Check if workflow has started
cadence --do my-domain workflow describe -w workflow-1

# List workflows
cadence --do my-domain workflow list

# Observe workflow (waits for start)
cadence --do my-domain workflow observe -w workflow-1

Rate Limiting

Consumer Rate Limiting

Control workflow start rate:
# Dynamic config
worker.asyncWorkflowConsumerStartRPS:
  - value: 1000
    constraints:
      domainName: "my-domain"

worker.asyncWorkflowConsumerConcurrency:
  - value: 100
    constraints:
      domainName: "my-domain"

Backpressure Handling

Kafka provides natural backpressure:
  • Queue Full: Kafka rejects new messages if retention exceeded
  • Slow Consumption: Messages accumulate in Kafka
  • Consumer Lag: Monitor consumer_lag metric

Monitoring

Key Metrics

Queue Depth:
# Messages waiting in queue
sum(kafka_consumer_lag{topic="cadence-async-workflow-queue"})
Consumption Rate:
# Workflows started per second
sum(rate(cadence_async_workflow_consumed_total[5m]))
Failure Rate:
# Failed workflow starts
sum(rate(cadence_async_workflow_failed_total[5m]))
End-to-End Latency:
# Time from queue to workflow start
histogram_quantile(0.95,
  rate(cadence_async_workflow_e2e_latency_bucket[5m])
)

CLI Monitoring

# Check domain configuration
cadence admin domain get-async-wf-config --domain my-domain

# Monitor Kafka consumer lag
kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --describe \
  --group cadence-async-wf-consumer

Alerting

High Consumer Lag:
alert: AsyncWorkflowHighLag
expr: kafka_consumer_lag{topic="cadence-async-workflow-queue"} > 10000
for: 5m
annotations:
  summary: "Async workflow queue backed up"
High Failure Rate:
alert: AsyncWorkflowHighFailures
expr: rate(cadence_async_workflow_failed_total[5m]) > 10
for: 5m
annotations:
  summary: "High async workflow failure rate"

Best Practices

Queue Configuration

  • Partition Count: Start with num_workers * concurrency / 1000 partitions
  • Retention: Set based on acceptable delay (24h typical)
  • Replication: Use RF=3 for durability
  • Max Message Size: Match Cadence payload limits

Consumer Tuning

  • Concurrency: Balance throughput vs. resource usage
  • Rate Limiting: Prevent overload of downstream services
  • Batch Size: Tune Kafka consumer fetch.min.bytes
  • Commit Interval: Balance consistency vs. throughput

Operational

  • Monitor Lag: Alert if lag exceeds threshold
  • DLQ: Configure dead-letter queue for failed starts
  • Capacity Planning: Size workers for peak load + headroom
  • Testing: Test queue behavior under load

Use Cases

Rate-Limited API

Protect backend from workflow start storms:
// API endpoint
func createOrder(w http.ResponseWriter, r *http.Request) {
    // Parse request
    var order Order
    json.NewDecoder(r.Body).Decode(&order)
    
    // Start workflow asynchronously
    // Returns immediately, rate limited by queue consumer
    we, err := cadenceClient.StartWorkflow(
        context.Background(),
        client.StartWorkflowOptions{
            ID:       fmt.Sprintf("order-%s", order.ID),
            TaskList: "order-processing",
        },
        ProcessOrderWorkflow,
        order,
    )
    
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }
    
    // Respond immediately
    json.NewEncoder(w).Encode(map[string]string{
        "workflowId": we.ID,
        "status":     "queued",
    })
}

Batch Processing

Queue large batches of workflow starts:
func processBatch(items []Item) error {
    for _, item := range items {
        // Each workflow start goes to queue
        // No blocking on workflow execution
        _, err := cadenceClient.StartWorkflow(
            context.Background(),
            client.StartWorkflowOptions{
                ID: fmt.Sprintf("batch-item-%s", item.ID),
            },
            ProcessItemWorkflow,
            item,
        )
        if err != nil {
            log.Printf("Failed to queue: %v", err)
            // Continue with other items
        }
    }
    return nil
}

Decoupled Services

Decouple request service from workflow execution:
Request Service → Kafka → Async Worker → Workflow
    (Fast)         (Buffer)  (Rate Limited)  (Execution)
Request service can scale independently of workflow capacity.

Troubleshooting

High Consumer Lag

Problem: Kafka consumer lag increasing Solution:
  • Scale up worker instances
  • Increase consumer concurrency
  • Check for slow workflow starts
  • Review rate limiting settings
  • Verify workers are healthy

Workflows Not Starting

Problem: Workflows queued but not executing Solution:
# Check async worker status
cadence admin cluster health

# Verify Kafka connectivity
kafka-console-consumer.sh \
  --bootstrap-server kafka:9092 \
  --topic cadence-async-workflow-queue \
  --from-beginning

# Check worker logs
grep "async.*workflow" /var/log/cadence/worker.log

# Verify domain configuration
cadence admin domain get-async-wf-config --domain my-domain

Failed Workflow Starts

Problem: High failure rate for async workflow starts Solution:
  • Check DLQ for failed messages
  • Review error logs in worker
  • Verify workflow registration
  • Check task list worker availability
  • Validate workflow input payloads

Advanced Topics

Dead Letter Queue

Handle failed workflow starts:
kafka:
  topics:
    cadence-async-workflow-dlq:
      cluster: test
  applications:
    async-wf-consumer:
      topic: cadence-async-workflow-queue
      dlq-topic: cadence-async-workflow-dlq
Failed starts go to DLQ after max retries. Process DLQ separately:
# Monitor DLQ
kafka-console-consumer.sh \
  --bootstrap-server kafka:9092 \
  --topic cadence-async-workflow-dlq

# Replay from DLQ (custom tool)
./replay-dlq --topic cadence-async-workflow-dlq

Custom Queue Implementation

Implement custom queue backend:
type Queue interface {
    // Enqueue workflow start request
    Enqueue(ctx context.Context, req *StartRequest) error
    
    // Consume workflow start requests
    Consume(ctx context.Context) (<-chan *StartRequest, error)
}
Register in queue provider and configure per domain.

Next Steps

Build docs developers (and LLMs) love