What is an Activity?
An activity is a unit of work that performs a single, well-defined action in a Cadence workflow. Activities are used for:
- Side effects: Database operations, API calls, file I/O
- Non-deterministic operations: Random number generation, system time
- External interactions: Third-party service calls, message queue operations
- Long-running operations: Video processing, batch jobs
While workflows must be deterministic, activities are designed to handle non-deterministic and side-effecting operations. Activities can fail and retry without affecting workflow determinism.
Why Activities Matter
Activities solve several critical problems:
- Separation of Concerns: Business logic (workflow) is separate from implementation (activities)
- Retry Logic: Built-in exponential backoff and retry policies
- Heartbeating: Long-running activities can report progress
- Timeouts: Fine-grained control over execution time
- Visibility: Each activity execution is tracked independently
Core Components
Activity Type
type ActivityType struct {
Name string `json:"name,omitempty"`
}
The activity type identifies which activity function to execute.
Activity Task Scheduling
type ActivityTaskScheduledEventAttributes struct {
ActivityID string
ActivityType *ActivityType
Domain *string
TaskList *TaskList
Input []byte
ScheduleToCloseTimeoutSeconds *int32 // End-to-end timeout
ScheduleToStartTimeoutSeconds *int32 // Queue timeout
StartToCloseTimeoutSeconds *int32 // Execution timeout
HeartbeatTimeoutSeconds *int32 // Heartbeat timeout
RetryPolicy *RetryPolicy
Header *Header
}
How Activities Work Internally
Activity Lifecycle
- Scheduled: Workflow schedules the activity
- Dispatched: Activity task added to task list
- Started: Worker picks up and starts execution
- Heartbeating: (Optional) Activity reports progress
- Completed: Activity returns result or error
- Recorded: Result stored in workflow history
Activity States
Activity Completion Events
Completed
Failed
Timed Out
Canceled
type ActivityTaskCompletedEventAttributes struct {
Result []byte // Activity return value
ScheduledEventID int64 // Reference to scheduled event
StartedEventID int64 // Reference to started event
Identity string // Worker identity
}
type ActivityTaskFailedEventAttributes struct {
Reason *string // Failure reason
Details []byte // Error details
ScheduledEventID int64
StartedEventID int64
Identity string
}
type ActivityTaskTimedOutEventAttributes struct {
Details []byte
ScheduledEventID int64
StartedEventID int64
TimeoutType *TimeoutType // START_TO_CLOSE, SCHEDULE_TO_START, etc.
LastFailureReason *string
LastFailureDetails []byte
}
type ActivityTaskCanceledEventAttributes struct {
Details []byte
LatestCancelRequestedEventID int64
ScheduledEventID int64
StartedEventID int64
Identity string
}
Activity Timeouts
Cadence provides four types of activity timeouts:
type TimeoutType int32
const (
TimeoutTypeStartToClose // Maximum execution time
TimeoutTypeScheduleToStart // Maximum queue time
TimeoutTypeScheduleToClose // End-to-end maximum time
TimeoutTypeHeartbeat // Maximum time between heartbeats
)
Timeout Relationships
┌─────────────────── ScheduleToClose ───────────────────┐
│ │
│ ┌──── ScheduleToStart ────┐ ┌── StartToClose ──┐ │
│ │ │ │ │ │
│ │ (in task queue) │ │ (executing) │ │
│ │ │ │ │ │
Schedule Start Complete
├─ Heartbeat ─┤├─ Heartbeat ─┤
(if configured)
Code Examples
package main
import (
"context"
"fmt"
"time"
"go.uber.org/cadence/activity"
)
// Simple activity
func ProcessPayment(ctx context.Context, orderID string, amount float64) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Processing payment", "orderID", orderID, "amount", amount)
// Simulate payment processing
time.Sleep(2 * time.Second)
// In real code, call payment gateway API
transactionID := fmt.Sprintf("txn_%s_%d", orderID, time.Now().Unix())
return transactionID, nil
}
// Activity with heartbeat
func ProcessLargeFile(ctx context.Context, fileURL string) error {
logger := activity.GetLogger(ctx)
// Simulate processing chunks
totalChunks := 100
for i := 0; i < totalChunks; i++ {
// Process chunk
time.Sleep(100 * time.Millisecond)
// Report progress via heartbeat
activity.RecordHeartbeat(ctx, i)
// Check for cancellation
if ctx.Err() == context.Canceled {
logger.Info("Activity canceled", "progress", i)
return ctx.Err()
}
}
logger.Info("File processing completed")
return nil
}
// Activity that can resume from last heartbeat
func ResumableActivity(ctx context.Context, input string) error {
startIdx := 0
// Check if we're resuming from a previous attempt
if activity.HasHeartbeatDetails(ctx) {
var prevIdx int
if err := activity.GetHeartbeatDetails(ctx, &prevIdx); err == nil {
startIdx = prevIdx + 1
activity.GetLogger(ctx).Info("Resuming activity", "from", startIdx)
}
}
// Process from last checkpoint
for i := startIdx; i < 1000; i++ {
// Do work
time.Sleep(10 * time.Millisecond)
// Checkpoint progress
activity.RecordHeartbeat(ctx, i)
}
return nil
}
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface PaymentActivities {
@ActivityMethod
String processPayment(String orderId, double amount);
@ActivityMethod
void processLargeFile(String fileUrl);
}
public class PaymentActivitiesImpl implements PaymentActivities {
private static final Logger logger = LoggerFactory.getLogger(PaymentActivitiesImpl.class);
@Override
public String processPayment(String orderId, double amount) {
logger.info("Processing payment for order: {} amount: {}", orderId, amount);
// Simulate payment processing
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String transactionId = "txn_" + orderId + "_" + System.currentTimeMillis();
return transactionId;
}
@Override
public void processLargeFile(String fileUrl) {
int totalChunks = 100;
for (int i = 0; i < totalChunks; i++) {
// Process chunk
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// Report progress via heartbeat
Activity.getExecutionContext().recordActivityHeartbeat(i);
}
logger.info("File processing completed");
}
}
Activity Configuration in Workflows
func OrderWorkflow(ctx workflow.Context, orderID string) error {
// Configure activity options
ao := workflow.ActivityOptions{
ActivityID: "payment-" + orderID, // Unique activity ID
ScheduleToStartTimeout: time.Minute, // Max queue time
StartToCloseTimeout: time.Minute * 5, // Max execution time
HeartbeatTimeout: time.Second * 20, // Max time between heartbeats
WaitForCancellation: true, // Wait for activity to handle cancellation
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
NonRetriableErrorReasons: []string{"InvalidInput"},
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Execute activity
var txnID string
err := workflow.ExecuteActivity(ctx, ProcessPayment, orderID, 99.99).Get(ctx, &txnID)
if err != nil {
return err
}
return nil
}
public class OrderWorkflowImpl implements OrderWorkflow {
private final PaymentActivities activities = Workflow.newActivityStub(
PaymentActivities.class,
new ActivityOptions.Builder()
.setActivityId("payment-" + orderId)
.setScheduleToStartTimeout(Duration.ofMinutes(1))
.setStartToCloseTimeout(Duration.ofMinutes(5))
.setHeartbeatTimeout(Duration.ofSeconds(20))
.setRetryOptions(
new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(1))
.setBackoffCoefficient(2.0)
.setMaximumInterval(Duration.ofMinutes(1))
.setMaximumAttempts(5)
.setDoNotRetry("InvalidInput")
.build()
)
.build()
);
@Override
public void processOrder(String orderId) {
String txnId = activities.processPayment(orderId, 99.99);
// Continue with order processing
}
}
Best Practices
1. Make Activities Idempotent
func CreateOrder(ctx context.Context, orderID string) error {
// Check if order already exists
exists, err := db.OrderExists(orderID)
if err != nil {
return err
}
if exists {
// Already created, return success
return nil
}
// Create order
return db.CreateOrder(orderID)
}
2. Use Appropriate Timeouts
// Short-running activity (API call)
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
}
// Long-running activity (video processing)
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute * 5,
StartToCloseTimeout: time.Hour,
HeartbeatTimeout: time.Minute, // Enable heartbeating
}
3. Implement Heartbeating for Long Activities
func ProcessBigData(ctx context.Context, dataURL string) error {
records := fetchRecords(dataURL)
for i, record := range records {
processRecord(record)
// Heartbeat every 100 records
if i%100 == 0 {
activity.RecordHeartbeat(ctx, i)
// Check for cancellation
if ctx.Err() == context.Canceled {
return ctx.Err()
}
}
}
return nil
}
4. Use Local Activities for Fast Operations
// Local activity - executed in same process as workflow
func GetFromCache(ctx context.Context, key string) (string, error) {
// Fast, local operation
return localCache.Get(key), nil
}
// In workflow
lao := workflow.LocalActivityOptions{
ScheduleToCloseTimeout: time.Second,
}
ctx = workflow.WithLocalActivityOptions(ctx, lao)
var value string
err := workflow.ExecuteLocalActivity(ctx, GetFromCache, "key").Get(ctx, &value)
5. Handle Retries Carefully
type BusinessError struct {
Message string
}
func (e BusinessError) Error() string {
return e.Message
}
func ValidateOrder(ctx context.Context, orderID string) error {
order, err := db.GetOrder(orderID)
if err != nil {
// Transient error - will retry
return err
}
if order.Amount < 0 {
// Business logic error - don't retry
return cadence.NewCustomError("InvalidAmount", "Order amount must be positive")
}
return nil
}
// In workflow - configure non-retriable errors
ao := workflow.ActivityOptions{
RetryPolicy: &cadence.RetryPolicy{
NonRetriableErrorReasons: []string{"InvalidAmount"},
},
}
Activity Patterns
Async Activity Completion
// Activity that completes asynchronously
func AsyncOperation(ctx context.Context, input string) error {
taskToken := activity.GetTaskToken(ctx)
// Start async operation
go func() {
result := doAsyncWork(input)
// Complete activity from different goroutine/process
client.CompleteActivity(taskToken, result, nil)
}()
// Return ErrActivityResultPending to indicate async completion
return activity.ErrResultPending
}
Activity Cancellation
func CancellableActivity(ctx context.Context) error {
// Start long operation
done := make(chan bool)
go func() {
// Do work
time.Sleep(time.Hour)
done <- true
}()
// Wait for completion or cancellation
select {
case <-done:
return nil
case <-ctx.Done():
// Cleanup on cancellation
cleanup()
return ctx.Err()
}
}
Common Pitfalls
Avoid These Common Mistakes:
- Long activities without heartbeats - Activities longer than a few minutes should heartbeat
- Non-idempotent activities - Activities must handle retries safely
- Incorrect timeout values -
ScheduleToClose should be greater than StartToClose
- Ignoring context cancellation - Always check
ctx.Done() in long activities
- Mutable global state - Activities can run in parallel, use thread-safe operations
Further Reading