What is a Worker?
A worker is a service that hosts the workflow and activity implementations. Workers are responsible for:
- Polling task lists for work
- Executing workflow decision tasks and activity tasks
- Reporting results back to the Cadence service
- Managing concurrent execution of tasks
Workers are the bridge between your application code and the Cadence service. They pull tasks from the service, execute your code, and report results back.
Why Workers Matter
Workers provide several critical capabilities:
- Scalability: Add more workers to handle increased load
- Fault Tolerance: If a worker crashes, tasks are reassigned to other workers
- Isolation: Separate workers for different environments (staging, production)
- Specialization: Different workers can handle different types of tasks
- Version Management: Roll out new code versions gradually
Worker Architecture
How Workers Work Internally
Worker Lifecycle
- Initialization: Worker registers workflows and activities
- Start: Worker begins polling task lists
- Task Execution: Worker processes tasks in thread pools
- Graceful Shutdown: Worker completes in-flight tasks before stopping
Task Processing Flow
Decision Tasks
Activity Tasks
1. Worker polls for decision task
2. Receives task with workflow history
3. Replays workflow code with history
4. Workflow generates decisions
5. Worker sends decisions to service
6. Repeats from step 1
1. Worker polls for activity task
2. Receives task with input parameters
3. Executes activity function
4. Activity completes with result/error
5. Worker reports result to service
6. Repeats from step 1
Worker Identity
Each worker has an identity used for tracking and debugging:
type WorkerVersionInfo struct {
Impl string // SDK implementation (e.g., "cadence-go")
FeatureVersion string // Feature version
}
Worker identity appears in:
- Activity completion events
- Workflow execution info
- Task list polling info
- Diagnostic tools
Code Examples
package main
import (
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
)
func main() {
// Create service client
h, err := client.NewClient(client.Options{
HostPort: "localhost:7933",
Domain: "my-domain",
MetricsScope: scope,
})
if err != nil {
panic(err)
}
defer h.Close()
// Create worker
w := worker.New(h, "my-task-list", worker.Options{
Logger: logger,
MaxConcurrentActivityExecutionSize: 10,
MaxConcurrentDecisionTaskExecutionSize: 10,
MaxConcurrentLocalActivityExecutionSize: 100,
})
// Register workflows
w.RegisterWorkflow(OrderWorkflow)
w.RegisterWorkflow(PaymentWorkflow)
// Register activities
w.RegisterActivity(ProcessPayment)
w.RegisterActivity(ShipOrder)
// Start worker
err = w.Start()
if err != nil {
panic(err)
}
// Wait for interrupt signal
select {}
}
import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.worker.Worker;
import com.uber.cadence.worker.WorkerFactory;
import com.uber.cadence.worker.WorkerOptions;
public class WorkerMain {
public static void main(String[] args) {
// Create service client
WorkflowClient client = WorkflowClient.newInstance(
"localhost", 7933, "my-domain"
);
// Create worker factory
WorkerFactory factory = WorkerFactory.newInstance(client);
// Create worker with options
WorkerOptions options = new WorkerOptions.Builder()
.setMaxConcurrentActivityExecutionSize(10)
.setMaxConcurrentWorkflowExecutionSize(10)
.setMaxConcurrentLocalActivityExecutionSize(100)
.build();
Worker worker = factory.newWorker("my-task-list", options);
// Register workflows
worker.registerWorkflowImplementationTypes(
OrderWorkflowImpl.class,
PaymentWorkflowImpl.class
);
// Register activities
worker.registerActivitiesImplementations(
new PaymentActivitiesImpl(),
new ShippingActivitiesImpl()
);
// Start all workers
factory.start();
// Keep process running
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
factory.shutdown();
factory.awaitTermination(10, TimeUnit.SECONDS);
}));
}
}
from cadence.worker import Worker
from cadence.workerfactory import WorkerFactory
from cadence.cadence_client import CadenceClient
# Create client
client = CadenceClient(
host='localhost',
port=7933,
domain='my-domain'
)
# Create worker factory
factory = WorkerFactory(
client=client,
domain='my-domain',
task_list='my-task-list'
)
# Create worker
worker = factory.new_worker(
task_list='my-task-list',
max_concurrent_activity_execution_size=10,
max_concurrent_workflow_execution_size=10
)
# Register workflows and activities
worker.register_workflow_implementation_type(OrderWorkflow)
worker.register_activities_implementation(PaymentActivities())
# Start worker
factory.start()
Worker Configuration
Concurrency Settings
workerOptions := worker.Options{
// Maximum concurrent decision tasks (workflow executions)
MaxConcurrentDecisionTaskExecutionSize: 100,
// Maximum concurrent activity tasks
MaxConcurrentActivityExecutionSize: 1000,
// Maximum concurrent local activities
MaxConcurrentLocalActivityExecutionSize: 1000,
// Poller count for decision tasks
MaxConcurrentDecisionTaskPollerSize: 5,
// Poller count for activity tasks
MaxConcurrentActivityTaskPollerSize: 5,
}
Rule of Thumb for Concurrency:
- Decision tasks: CPU-bound, set to ~2x CPU cores
- Activity tasks: I/O-bound, can be much higher (100-1000+)
- Pollers: Usually 2-5 per task list is sufficient
Resource Limits
workerOptions := worker.Options{
// Maximum polling rate
MaxActivitiesPerSecond: 100000,
// Task execution timeout
WorkerActivitiesPerSecond: 100000,
// Sticky schedule to start timeout
StickyScheduleToStartTimeout: time.Second * 5,
// Disable eager activities
DisableEagerActivities: false,
// Data converter for serialization
DataConverter: customDataConverter,
}
Worker Patterns
Multiple Task Lists
func main() {
h, _ := client.NewClient(client.Options{Domain: "my-domain"})
// Worker for high-priority tasks
highPriorityWorker := worker.New(h, "high-priority-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 100,
})
highPriorityWorker.RegisterActivity(CriticalActivity)
highPriorityWorker.Start()
// Worker for low-priority tasks
lowPriorityWorker := worker.New(h, "low-priority-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 10,
})
lowPriorityWorker.RegisterActivity(BackgroundActivity)
lowPriorityWorker.Start()
select {}
}
Specialized Workers
// GPU worker for ML tasks
gpuWorker := worker.New(h, "gpu-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 2, // Limited GPU resources
})
gpuWorker.RegisterActivity(TrainModel)
gpuWorker.RegisterActivity(RunInference)
// CPU worker for standard tasks
cpuWorker := worker.New(h, "cpu-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 100,
})
cpuWorker.RegisterActivity(ProcessData)
cpuWorker.RegisterActivity(GenerateReport)
Session Workers
// Worker with session support for file processing
sessionWorker := worker.New(h, "file-processing", worker.Options{
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: 10,
})
// Activities that use sessions
sessionWorker.RegisterActivity(DownloadFile)
sessionWorker.RegisterActivity(ProcessFile)
sessionWorker.RegisterActivity(UploadResult)
Best Practices
1. Implement Graceful Shutdown
func main() {
w := worker.New(h, "my-task-list", worker.Options{})
w.RegisterWorkflow(MyWorkflow)
w.Start()
// Handle shutdown signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down worker...")
// Stop accepting new tasks
w.Stop()
// Wait for in-flight tasks with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
<-ctx.Done()
log.Println("Worker stopped")
}
2. Use Health Checks
func healthCheck(w worker.Worker) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
// Check if worker is running
// Kubernetes liveness/readiness probe
rw.WriteHeader(http.StatusOK)
rw.Write([]byte("OK"))
}
}
func main() {
w := worker.New(h, "my-task-list", worker.Options{})
w.Start()
// Expose health endpoint
http.HandleFunc("/health", healthCheck(w))
go http.ListenAndServe(":8080", nil)
select {}
}
3. Monitor Worker Metrics
import (
"github.com/uber-go/tally"
"go.uber.org/cadence/client"
)
func main() {
// Create metrics scope
scope := tally.NewTestScope("cadence", nil)
h, _ := client.NewClient(client.Options{
MetricsScope: scope,
})
w := worker.New(h, "my-task-list", worker.Options{
Logger: logger,
})
// Key metrics to monitor:
// - cadence_worker_task_slots_available
// - cadence_worker_task_execution_failed
// - cadence_worker_task_execution_latency
// - cadence_worker_poller_start_counter
w.Start()
}
4. Use Separate Worker Pools
// Development workers
if env == "dev" {
worker := worker.New(h, "dev-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 5,
})
worker.RegisterWorkflow(TestWorkflow)
worker.Start()
}
// Production workers
if env == "prod" {
worker := worker.New(h, "prod-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 100,
})
worker.RegisterWorkflow(ProductionWorkflow)
worker.Start()
}
5. Version Your Workers
// Worker for version 1.0
workerV1 := worker.New(h, "my-tasks", worker.Options{
Identity: "my-worker-v1.0",
})
workerV1.RegisterWorkflow(MyWorkflowV1)
workerV1.Start()
// Worker for version 2.0 (canary deployment)
workerV2 := worker.New(h, "my-tasks", worker.Options{
Identity: "my-worker-v2.0",
})
workerV2.RegisterWorkflow(MyWorkflowV2)
workerV2.Start()
// Both versions can coexist, process existing and new workflows
Deployment Strategies
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: cadence-worker
spec:
replicas: 3
selector:
matchLabels:
app: cadence-worker
template:
metadata:
labels:
app: cadence-worker
spec:
containers:
- name: worker
image: my-worker:v1.0
env:
- name: CADENCE_HOST
value: "cadence-frontend:7933"
- name: DOMAIN
value: "production"
- name: TASK_LIST
value: "my-tasks"
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
Auto-Scaling
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: cadence-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: cadence-worker
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Common Issues and Solutions
Common Worker Problems:
-
Worker not picking up tasks
- Check task list name matches
- Verify domain is correct
- Ensure worker is started
-
High latency
- Increase poller count
- Add more worker instances
- Check network connectivity
-
Memory leaks
- Monitor goroutine count
- Check for unclosed resources
- Use pprof for profiling
-
Deadlocks
- Avoid blocking operations in workflows
- Use workflow.Go() for concurrency
- Set appropriate timeouts
Worker Service Architecture
Cadence also includes a built-in worker service for system tasks:
// From service/worker/service.go
type Service struct {
resource.Resource
status int32
stopC chan struct{}
params *resource.Params
config *Config
}
// Worker service hosts:
// 1. Replicator: Handles replication tasks
// 2. Indexer: Updates visibility records
// 3. Archiver: Archives workflow histories
// 4. Scanner: Scans and fixes data inconsistencies
Further Reading