Skip to main content

What is a Worker?

A worker is a service that hosts the workflow and activity implementations. Workers are responsible for:
  • Polling task lists for work
  • Executing workflow decision tasks and activity tasks
  • Reporting results back to the Cadence service
  • Managing concurrent execution of tasks
Workers are the bridge between your application code and the Cadence service. They pull tasks from the service, execute your code, and report results back.

Why Workers Matter

Workers provide several critical capabilities:
  1. Scalability: Add more workers to handle increased load
  2. Fault Tolerance: If a worker crashes, tasks are reassigned to other workers
  3. Isolation: Separate workers for different environments (staging, production)
  4. Specialization: Different workers can handle different types of tasks
  5. Version Management: Roll out new code versions gradually

Worker Architecture

How Workers Work Internally

Worker Lifecycle

  1. Initialization: Worker registers workflows and activities
  2. Start: Worker begins polling task lists
  3. Task Execution: Worker processes tasks in thread pools
  4. Graceful Shutdown: Worker completes in-flight tasks before stopping

Task Processing Flow

1. Worker polls for decision task
2. Receives task with workflow history
3. Replays workflow code with history
4. Workflow generates decisions
5. Worker sends decisions to service
6. Repeats from step 1

Worker Identity

Each worker has an identity used for tracking and debugging:
type WorkerVersionInfo struct {
    Impl           string  // SDK implementation (e.g., "cadence-go")
    FeatureVersion string  // Feature version
}
Worker identity appears in:
  • Activity completion events
  • Workflow execution info
  • Task list polling info
  • Diagnostic tools

Code Examples

package main

import (
    "go.uber.org/cadence/client"
    "go.uber.org/cadence/worker"
)

func main() {
    // Create service client
    h, err := client.NewClient(client.Options{
        HostPort:     "localhost:7933",
        Domain:       "my-domain",
        MetricsScope: scope,
    })
    if err != nil {
        panic(err)
    }
    defer h.Close()

    // Create worker
    w := worker.New(h, "my-task-list", worker.Options{
        Logger:                             logger,
        MaxConcurrentActivityExecutionSize: 10,
        MaxConcurrentDecisionTaskExecutionSize: 10,
        MaxConcurrentLocalActivityExecutionSize: 100,
    })

    // Register workflows
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterWorkflow(PaymentWorkflow)

    // Register activities
    w.RegisterActivity(ProcessPayment)
    w.RegisterActivity(ShipOrder)

    // Start worker
    err = w.Start()
    if err != nil {
        panic(err)
    }

    // Wait for interrupt signal
    select {}
}

Worker Configuration

Concurrency Settings

workerOptions := worker.Options{
    // Maximum concurrent decision tasks (workflow executions)
    MaxConcurrentDecisionTaskExecutionSize: 100,

    // Maximum concurrent activity tasks
    MaxConcurrentActivityExecutionSize: 1000,

    // Maximum concurrent local activities
    MaxConcurrentLocalActivityExecutionSize: 1000,

    // Poller count for decision tasks
    MaxConcurrentDecisionTaskPollerSize: 5,

    // Poller count for activity tasks
    MaxConcurrentActivityTaskPollerSize: 5,
}

Performance Tuning

Rule of Thumb for Concurrency:
  • Decision tasks: CPU-bound, set to ~2x CPU cores
  • Activity tasks: I/O-bound, can be much higher (100-1000+)
  • Pollers: Usually 2-5 per task list is sufficient

Resource Limits

workerOptions := worker.Options{
    // Maximum polling rate
    MaxActivitiesPerSecond: 100000,

    // Task execution timeout
    WorkerActivitiesPerSecond: 100000,

    // Sticky schedule to start timeout
    StickyScheduleToStartTimeout: time.Second * 5,

    // Disable eager activities
    DisableEagerActivities: false,

    // Data converter for serialization
    DataConverter: customDataConverter,
}

Worker Patterns

Multiple Task Lists

func main() {
    h, _ := client.NewClient(client.Options{Domain: "my-domain"})

    // Worker for high-priority tasks
    highPriorityWorker := worker.New(h, "high-priority-tasks", worker.Options{
        MaxConcurrentActivityExecutionSize: 100,
    })
    highPriorityWorker.RegisterActivity(CriticalActivity)
    highPriorityWorker.Start()

    // Worker for low-priority tasks
    lowPriorityWorker := worker.New(h, "low-priority-tasks", worker.Options{
        MaxConcurrentActivityExecutionSize: 10,
    })
    lowPriorityWorker.RegisterActivity(BackgroundActivity)
    lowPriorityWorker.Start()

    select {}
}

Specialized Workers

// GPU worker for ML tasks
gpuWorker := worker.New(h, "gpu-tasks", worker.Options{
    MaxConcurrentActivityExecutionSize: 2, // Limited GPU resources
})
gpuWorker.RegisterActivity(TrainModel)
gpuWorker.RegisterActivity(RunInference)

// CPU worker for standard tasks
cpuWorker := worker.New(h, "cpu-tasks", worker.Options{
    MaxConcurrentActivityExecutionSize: 100,
})
cpuWorker.RegisterActivity(ProcessData)
cpuWorker.RegisterActivity(GenerateReport)

Session Workers

// Worker with session support for file processing
sessionWorker := worker.New(h, "file-processing", worker.Options{
    EnableSessionWorker:               true,
    MaxConcurrentSessionExecutionSize: 10,
})

// Activities that use sessions
sessionWorker.RegisterActivity(DownloadFile)
sessionWorker.RegisterActivity(ProcessFile)
sessionWorker.RegisterActivity(UploadResult)

Best Practices

1. Implement Graceful Shutdown

func main() {
    w := worker.New(h, "my-task-list", worker.Options{})
    w.RegisterWorkflow(MyWorkflow)
    w.Start()

    // Handle shutdown signals
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)

    <-sigCh
    log.Println("Shutting down worker...")

    // Stop accepting new tasks
    w.Stop()

    // Wait for in-flight tasks with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    <-ctx.Done()
    log.Println("Worker stopped")
}

2. Use Health Checks

func healthCheck(w worker.Worker) http.HandlerFunc {
    return func(rw http.ResponseWriter, r *http.Request) {
        // Check if worker is running
        // Kubernetes liveness/readiness probe
        rw.WriteHeader(http.StatusOK)
        rw.Write([]byte("OK"))
    }
}

func main() {
    w := worker.New(h, "my-task-list", worker.Options{})
    w.Start()

    // Expose health endpoint
    http.HandleFunc("/health", healthCheck(w))
    go http.ListenAndServe(":8080", nil)

    select {}
}

3. Monitor Worker Metrics

import (
    "github.com/uber-go/tally"
    "go.uber.org/cadence/client"
)

func main() {
    // Create metrics scope
    scope := tally.NewTestScope("cadence", nil)

    h, _ := client.NewClient(client.Options{
        MetricsScope: scope,
    })

    w := worker.New(h, "my-task-list", worker.Options{
        Logger: logger,
    })

    // Key metrics to monitor:
    // - cadence_worker_task_slots_available
    // - cadence_worker_task_execution_failed
    // - cadence_worker_task_execution_latency
    // - cadence_worker_poller_start_counter

    w.Start()
}

4. Use Separate Worker Pools

// Development workers
if env == "dev" {
    worker := worker.New(h, "dev-tasks", worker.Options{
        MaxConcurrentActivityExecutionSize: 5,
    })
    worker.RegisterWorkflow(TestWorkflow)
    worker.Start()
}

// Production workers
if env == "prod" {
    worker := worker.New(h, "prod-tasks", worker.Options{
        MaxConcurrentActivityExecutionSize: 100,
    })
    worker.RegisterWorkflow(ProductionWorkflow)
    worker.Start()
}

5. Version Your Workers

// Worker for version 1.0
workerV1 := worker.New(h, "my-tasks", worker.Options{
    Identity: "my-worker-v1.0",
})
workerV1.RegisterWorkflow(MyWorkflowV1)
workerV1.Start()

// Worker for version 2.0 (canary deployment)
workerV2 := worker.New(h, "my-tasks", worker.Options{
    Identity: "my-worker-v2.0",
})
workerV2.RegisterWorkflow(MyWorkflowV2)
workerV2.Start()

// Both versions can coexist, process existing and new workflows

Deployment Strategies

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: cadence-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: cadence-worker
  template:
    metadata:
      labels:
        app: cadence-worker
    spec:
      containers:
      - name: worker
        image: my-worker:v1.0
        env:
        - name: CADENCE_HOST
          value: "cadence-frontend:7933"
        - name: DOMAIN
          value: "production"
        - name: TASK_LIST
          value: "my-tasks"
        resources:
          requests:
            memory: "256Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5

Auto-Scaling

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: cadence-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: cadence-worker
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Common Issues and Solutions

Common Worker Problems:
  1. Worker not picking up tasks
    • Check task list name matches
    • Verify domain is correct
    • Ensure worker is started
  2. High latency
    • Increase poller count
    • Add more worker instances
    • Check network connectivity
  3. Memory leaks
    • Monitor goroutine count
    • Check for unclosed resources
    • Use pprof for profiling
  4. Deadlocks
    • Avoid blocking operations in workflows
    • Use workflow.Go() for concurrency
    • Set appropriate timeouts

Worker Service Architecture

Cadence also includes a built-in worker service for system tasks:
// From service/worker/service.go
type Service struct {
    resource.Resource
    status int32
    stopC  chan struct{}
    params *resource.Params
    config *Config
}

// Worker service hosts:
// 1. Replicator: Handles replication tasks
// 2. Indexer: Updates visibility records
// 3. Archiver: Archives workflow histories
// 4. Scanner: Scans and fixes data inconsistencies

Further Reading

Build docs developers (and LLMs) love