What is a Workflow?
A workflow is the fundamental building block of Cadence. It represents a resilient function that orchestrates activities, manages state, and coordinates business logic. Workflows are:
- Durable: Workflow state survives process crashes and restarts
- Deterministic: Workflows replay their history to reconstruct state
- Long-running: Can execute for seconds, days, or even years
- Fault-tolerant: Automatically recover from failures
Workflows in Cadence are different from traditional functions - they are durable executions that can survive machine failures, orchestrate distributed operations, and maintain state across long time periods.
Core Components
Every workflow execution consists of several key components:
Workflow Execution
A workflow execution is uniquely identified by:
type WorkflowExecution struct {
WorkflowID string `json:"workflowId,omitempty"`
RunID string `json:"runId,omitempty"`
}
- WorkflowID: User-provided identifier for the workflow (e.g., “order-12345”)
- RunID: System-generated UUID for each workflow run
Workflow Type
type WorkflowType struct {
Name string `json:"name,omitempty"`
}
The workflow type identifies which workflow function to execute, similar to a function name.
How Workflows Work Internally
Event-Driven Execution
Workflows operate through an event-driven model:
- Decision Tasks: The workflow receives tasks containing new events
- Replay: The workflow code replays from the beginning using event history
- Determinism: The workflow must make the same decisions given the same history
- New Decisions: The workflow returns new decisions to execute
Determinism is Critical: Workflow code must be deterministic. Never use:
- Random number generators
- System time (use workflow.Now() instead)
- Network calls (use activities instead)
- Threading (use workflow goroutines)
Workflow States
type WorkflowExecutionStatus int32
const (
WorkflowExecutionStatusRunning
WorkflowExecutionStatusCompleted
WorkflowExecutionStatusFailed
WorkflowExecutionStatusCanceled
WorkflowExecutionStatusTerminated
WorkflowExecutionStatusContinuedAsNew
WorkflowExecutionStatusTimedOut
)
Close Status
type WorkflowExecutionCloseStatus int32
const (
WorkflowExecutionCloseStatusCompleted // 0: Successfully completed
WorkflowExecutionCloseStatusFailed // 1: Failed with error
WorkflowExecutionCloseStatusCanceled // 2: Canceled by request
WorkflowExecutionCloseStatusTerminated // 3: Administratively terminated
WorkflowExecutionCloseStatusContinuedAsNew // 4: Continued as new workflow
WorkflowExecutionCloseStatusTimedOut // 5: Execution timeout
)
Code Examples
package main
import (
"time"
"go.uber.org/cadence/workflow"
)
// OrderWorkflow orchestrates order processing
func OrderWorkflow(ctx workflow.Context, orderID string) error {
logger := workflow.GetLogger(ctx)
logger.Info("Processing order", "orderID", orderID)
// Configure activity options
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
HeartbeatTimeout: time.Second * 20,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Execute activities
var result string
err := workflow.ExecuteActivity(ctx, ProcessPayment, orderID).Get(ctx, &result)
if err != nil {
logger.Error("Payment failed", "error", err)
return err
}
// Wait for shipping
err = workflow.ExecuteActivity(ctx, ShipOrder, orderID).Get(ctx, nil)
if err != nil {
logger.Error("Shipping failed", "error", err)
return err
}
logger.Info("Order completed successfully")
return nil
}
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowMethod;
import org.slf4j.Logger;
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(String orderId);
}
public class OrderWorkflowImpl implements OrderWorkflow {
private final Logger logger = Workflow.getLogger(OrderWorkflowImpl.class);
private final OrderActivities activities = Workflow.newActivityStub(
OrderActivities.class,
new ActivityOptions.Builder()
.setScheduleToStartTimeout(Duration.ofMinutes(1))
.setStartToCloseTimeout(Duration.ofMinutes(1))
.setHeartbeatTimeout(Duration.ofSeconds(20))
.setRetryOptions(
new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(1))
.setBackoffCoefficient(2.0)
.setMaximumInterval(Duration.ofMinutes(1))
.setMaximumAttempts(5)
.build()
)
.build()
);
@Override
public void processOrder(String orderId) {
logger.info("Processing order: {}", orderId);
try {
activities.processPayment(orderId);
activities.shipOrder(orderId);
logger.info("Order completed successfully");
} catch (Exception e) {
logger.error("Order processing failed", e);
throw e;
}
}
}
Workflow Configuration
Workflows are configured with various timeouts and policies:
type WorkflowExecutionStartedEventAttributes struct {
WorkflowType *WorkflowType
TaskList *TaskList
Input []byte
ExecutionStartToCloseTimeoutSeconds *int32 // Total workflow timeout
TaskStartToCloseTimeoutSeconds *int32 // Decision task timeout
ContinuedExecutionRunID string
Initiator *ContinueAsNewInitiator
CronSchedule string // For periodic workflows
RetryPolicy *RetryPolicy
Memo *Memo
SearchAttributes *SearchAttributes
}
Key Timeouts
- ExecutionStartToCloseTimeout: Maximum time for the entire workflow execution
- TaskStartToCloseTimeout: Maximum time to process a single decision task
Best Practices
1. Keep Workflows Deterministic
// Use workflow time APIs
now := workflow.Now(ctx)
workflow.Sleep(ctx, time.Hour)
// Use workflow random
r := workflow.NewRandom(ctx)
value := r.Intn(100)
2. Use Continue-As-New for Long Histories
func LongRunningWorkflow(ctx workflow.Context, iteration int) error {
// Process current iteration
err := workflow.ExecuteActivity(ctx, ProcessBatch, iteration).Get(ctx, nil)
if err != nil {
return err
}
// Continue as new after 1000 iterations to prevent history from growing too large
if iteration < 10000 {
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, iteration+1)
}
return nil
}
3. Handle Failures Gracefully
func ResilientWorkflow(ctx workflow.Context) error {
// Set retry policy for activities
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Try primary operation
err := workflow.ExecuteActivity(ctx, PrimaryOperation).Get(ctx, nil)
if err != nil {
// Fall back to alternative
logger := workflow.GetLogger(ctx)
logger.Warn("Primary operation failed, using fallback", "error", err)
return workflow.ExecuteActivity(ctx, FallbackOperation).Get(ctx, nil)
}
return nil
}
4. Use Workflow Versioning
func VersionedWorkflow(ctx workflow.Context) error {
v := workflow.GetVersion(ctx, "add-validation", workflow.DefaultVersion, 1)
if v == workflow.DefaultVersion {
// Old behavior
return workflow.ExecuteActivity(ctx, ProcessOld).Get(ctx, nil)
} else {
// New behavior with validation
err := workflow.ExecuteActivity(ctx, ValidateInput).Get(ctx, nil)
if err != nil {
return err
}
return workflow.ExecuteActivity(ctx, ProcessNew).Get(ctx, nil)
}
}
Common Patterns
Sequential Processing
func SequentialWorkflow(ctx workflow.Context, tasks []string) error {
for _, task := range tasks {
err := workflow.ExecuteActivity(ctx, ProcessTask, task).Get(ctx, nil)
if err != nil {
return err
}
}
return nil
}
Parallel Processing
func ParallelWorkflow(ctx workflow.Context, tasks []string) error {
var futures []workflow.Future
// Start all activities in parallel
for _, task := range tasks {
future := workflow.ExecuteActivity(ctx, ProcessTask, task)
futures = append(futures, future)
}
// Wait for all to complete
for _, future := range futures {
err := future.Get(ctx, nil)
if err != nil {
return err
}
}
return nil
}
Saga Pattern (Compensating Transactions)
func SagaWorkflow(ctx workflow.Context, orderID string) error {
var compensations []func() error
// Reserve inventory
err := workflow.ExecuteActivity(ctx, ReserveInventory, orderID).Get(ctx, nil)
if err != nil {
return err
}
compensations = append(compensations, func() error {
return workflow.ExecuteActivity(ctx, ReleaseInventory, orderID).Get(ctx, nil)
})
// Charge payment
err = workflow.ExecuteActivity(ctx, ChargePayment, orderID).Get(ctx, nil)
if err != nil {
// Compensate by releasing inventory
for i := len(compensations) - 1; i >= 0; i-- {
compensations[i]()
}
return err
}
return nil
}
- Activities - Learn about executing side effects in workflows
- Workers - Understand how workers execute workflow and activity code
- Task Lists - See how workflows are routed to workers
- Signals & Queries - Interact with running workflows
- Domains - Organize and isolate workflows
Further Reading