Skip to main content

What is an Activity?

An activity is a unit of work that performs a single, well-defined action in a Cadence workflow. Activities are used for:
  • Side effects: Database operations, API calls, file I/O
  • Non-deterministic operations: Random number generation, system time
  • External interactions: Third-party service calls, message queue operations
  • Long-running operations: Video processing, batch jobs
While workflows must be deterministic, activities are designed to handle non-deterministic and side-effecting operations. Activities can fail and retry without affecting workflow determinism.

Why Activities Matter

Activities solve several critical problems:
  1. Separation of Concerns: Business logic (workflow) is separate from implementation (activities)
  2. Retry Logic: Built-in exponential backoff and retry policies
  3. Heartbeating: Long-running activities can report progress
  4. Timeouts: Fine-grained control over execution time
  5. Visibility: Each activity execution is tracked independently

Core Components

Activity Type

type ActivityType struct {
    Name string `json:"name,omitempty"`
}
The activity type identifies which activity function to execute.

Activity Task Scheduling

type ActivityTaskScheduledEventAttributes struct {
    ActivityID                    string
    ActivityType                  *ActivityType
    Domain                        *string
    TaskList                      *TaskList
    Input                         []byte
    ScheduleToCloseTimeoutSeconds *int32  // End-to-end timeout
    ScheduleToStartTimeoutSeconds *int32  // Queue timeout
    StartToCloseTimeoutSeconds    *int32  // Execution timeout
    HeartbeatTimeoutSeconds       *int32  // Heartbeat timeout
    RetryPolicy                   *RetryPolicy
    Header                        *Header
}

How Activities Work Internally

Activity Lifecycle

  1. Scheduled: Workflow schedules the activity
  2. Dispatched: Activity task added to task list
  3. Started: Worker picks up and starts execution
  4. Heartbeating: (Optional) Activity reports progress
  5. Completed: Activity returns result or error
  6. Recorded: Result stored in workflow history

Activity States

Activity Completion Events

type ActivityTaskCompletedEventAttributes struct {
    Result           []byte  // Activity return value
    ScheduledEventID int64   // Reference to scheduled event
    StartedEventID   int64   // Reference to started event
    Identity         string  // Worker identity
}

Activity Timeouts

Cadence provides four types of activity timeouts:
type TimeoutType int32

const (
    TimeoutTypeStartToClose    // Maximum execution time
    TimeoutTypeScheduleToStart // Maximum queue time
    TimeoutTypeScheduleToClose // End-to-end maximum time
    TimeoutTypeHeartbeat       // Maximum time between heartbeats
)

Timeout Relationships

┌─────────────────── ScheduleToClose ───────────────────┐
│                                                         │
│  ┌──── ScheduleToStart ────┐  ┌── StartToClose ──┐   │
│  │                          │  │                   │   │
│  │    (in task queue)       │  │   (executing)     │   │
│  │                          │  │                   │   │
   Schedule                  Start                 Complete

            ├─ Heartbeat ─┤├─ Heartbeat ─┤
               (if configured)

Code Examples

package main

import (
    "context"
    "fmt"
    "time"
    "go.uber.org/cadence/activity"
)

// Simple activity
func ProcessPayment(ctx context.Context, orderID string, amount float64) (string, error) {
    logger := activity.GetLogger(ctx)
    logger.Info("Processing payment", "orderID", orderID, "amount", amount)

    // Simulate payment processing
    time.Sleep(2 * time.Second)

    // In real code, call payment gateway API
    transactionID := fmt.Sprintf("txn_%s_%d", orderID, time.Now().Unix())

    return transactionID, nil
}

// Activity with heartbeat
func ProcessLargeFile(ctx context.Context, fileURL string) error {
    logger := activity.GetLogger(ctx)

    // Simulate processing chunks
    totalChunks := 100
    for i := 0; i < totalChunks; i++ {
        // Process chunk
        time.Sleep(100 * time.Millisecond)

        // Report progress via heartbeat
        activity.RecordHeartbeat(ctx, i)

        // Check for cancellation
        if ctx.Err() == context.Canceled {
            logger.Info("Activity canceled", "progress", i)
            return ctx.Err()
        }
    }

    logger.Info("File processing completed")
    return nil
}

// Activity that can resume from last heartbeat
func ResumableActivity(ctx context.Context, input string) error {
    startIdx := 0

    // Check if we're resuming from a previous attempt
    if activity.HasHeartbeatDetails(ctx) {
        var prevIdx int
        if err := activity.GetHeartbeatDetails(ctx, &prevIdx); err == nil {
            startIdx = prevIdx + 1
            activity.GetLogger(ctx).Info("Resuming activity", "from", startIdx)
        }
    }

    // Process from last checkpoint
    for i := startIdx; i < 1000; i++ {
        // Do work
        time.Sleep(10 * time.Millisecond)

        // Checkpoint progress
        activity.RecordHeartbeat(ctx, i)
    }

    return nil
}

Activity Configuration in Workflows

func OrderWorkflow(ctx workflow.Context, orderID string) error {
    // Configure activity options
    ao := workflow.ActivityOptions{
        ActivityID:             "payment-" + orderID,  // Unique activity ID
        ScheduleToStartTimeout: time.Minute,           // Max queue time
        StartToCloseTimeout:    time.Minute * 5,       // Max execution time
        HeartbeatTimeout:       time.Second * 20,      // Max time between heartbeats
        WaitForCancellation:    true,                  // Wait for activity to handle cancellation
        RetryPolicy: &cadence.RetryPolicy{
            InitialInterval:          time.Second,
            BackoffCoefficient:       2.0,
            MaximumInterval:          time.Minute,
            MaximumAttempts:          5,
            NonRetriableErrorReasons: []string{"InvalidInput"},
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Execute activity
    var txnID string
    err := workflow.ExecuteActivity(ctx, ProcessPayment, orderID, 99.99).Get(ctx, &txnID)
    if err != nil {
        return err
    }

    return nil
}

Best Practices

1. Make Activities Idempotent

func CreateOrder(ctx context.Context, orderID string) error {
    // Check if order already exists
    exists, err := db.OrderExists(orderID)
    if err != nil {
        return err
    }
    if exists {
        // Already created, return success
        return nil
    }

    // Create order
    return db.CreateOrder(orderID)
}

2. Use Appropriate Timeouts

// Short-running activity (API call)
ao := workflow.ActivityOptions{
    ScheduleToStartTimeout: time.Minute,
    StartToCloseTimeout:    time.Minute,
}

// Long-running activity (video processing)
ao := workflow.ActivityOptions{
    ScheduleToStartTimeout: time.Minute * 5,
    StartToCloseTimeout:    time.Hour,
    HeartbeatTimeout:       time.Minute,  // Enable heartbeating
}

3. Implement Heartbeating for Long Activities

func ProcessBigData(ctx context.Context, dataURL string) error {
    records := fetchRecords(dataURL)

    for i, record := range records {
        processRecord(record)

        // Heartbeat every 100 records
        if i%100 == 0 {
            activity.RecordHeartbeat(ctx, i)

            // Check for cancellation
            if ctx.Err() == context.Canceled {
                return ctx.Err()
            }
        }
    }

    return nil
}

4. Use Local Activities for Fast Operations

// Local activity - executed in same process as workflow
func GetFromCache(ctx context.Context, key string) (string, error) {
    // Fast, local operation
    return localCache.Get(key), nil
}

// In workflow
lao := workflow.LocalActivityOptions{
    ScheduleToCloseTimeout: time.Second,
}
ctx = workflow.WithLocalActivityOptions(ctx, lao)

var value string
err := workflow.ExecuteLocalActivity(ctx, GetFromCache, "key").Get(ctx, &value)

5. Handle Retries Carefully

type BusinessError struct {
    Message string
}

func (e BusinessError) Error() string {
    return e.Message
}

func ValidateOrder(ctx context.Context, orderID string) error {
    order, err := db.GetOrder(orderID)
    if err != nil {
        // Transient error - will retry
        return err
    }

    if order.Amount < 0 {
        // Business logic error - don't retry
        return cadence.NewCustomError("InvalidAmount", "Order amount must be positive")
    }

    return nil
}

// In workflow - configure non-retriable errors
ao := workflow.ActivityOptions{
    RetryPolicy: &cadence.RetryPolicy{
        NonRetriableErrorReasons: []string{"InvalidAmount"},
    },
}

Activity Patterns

Async Activity Completion

// Activity that completes asynchronously
func AsyncOperation(ctx context.Context, input string) error {
    taskToken := activity.GetTaskToken(ctx)

    // Start async operation
    go func() {
        result := doAsyncWork(input)

        // Complete activity from different goroutine/process
        client.CompleteActivity(taskToken, result, nil)
    }()

    // Return ErrActivityResultPending to indicate async completion
    return activity.ErrResultPending
}

Activity Cancellation

func CancellableActivity(ctx context.Context) error {
    // Start long operation
    done := make(chan bool)

    go func() {
        // Do work
        time.Sleep(time.Hour)
        done <- true
    }()

    // Wait for completion or cancellation
    select {
    case <-done:
        return nil
    case <-ctx.Done():
        // Cleanup on cancellation
        cleanup()
        return ctx.Err()
    }
}

Common Pitfalls

Avoid These Common Mistakes:
  1. Long activities without heartbeats - Activities longer than a few minutes should heartbeat
  2. Non-idempotent activities - Activities must handle retries safely
  3. Incorrect timeout values - ScheduleToClose should be greater than StartToClose
  4. Ignoring context cancellation - Always check ctx.Done() in long activities
  5. Mutable global state - Activities can run in parallel, use thread-safe operations

Further Reading

Build docs developers (and LLMs) love