Skip to main content

Overview

While workflows execute autonomously, you often need to interact with them while they’re running. Cadence provides two mechanisms for this:
  • Signals: Send data to a running workflow (write operation)
  • Queries: Read workflow state without modifying it (read operation)
Signals and queries enable real-time interaction with workflows without breaking determinism. They’re essential for building interactive, event-driven applications.

Signals

What is a Signal?

A signal is an asynchronous message sent to a running workflow execution. Signals are used for:
  • External events: User actions, system events, webhooks
  • State changes: Approval/rejection, cancellation requests
  • Data updates: New information becoming available
  • Coordination: Synchronizing between workflows

How Signals Work

Signal Type Definition

type SignalExternalWorkflowExecutionInitiatedEventAttributes struct {
    DecisionTaskCompletedEventID int64
    Domain                       string
    WorkflowExecution            *WorkflowExecution
    SignalName                   string             // Name of the signal
    Input                        []byte             // Signal payload
    Control                      []byte
    ChildWorkflowOnly            bool
}

Code Examples

package main

import (
    "go.uber.org/cadence/workflow"
)

// Workflow with signal handler
func OrderWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)
    var state string = "pending"

    // Create signal channel
    approvalChannel := workflow.GetSignalChannel(ctx, "approve-order")
    rejectionChannel := workflow.GetSignalChannel(ctx, "reject-order")

    // Wait for approval or rejection
    selector := workflow.NewSelector(ctx)

    selector.AddReceive(approvalChannel, func(c workflow.ReceiveChannel, more bool) {
        var approvalData ApprovalData
        c.Receive(ctx, &approvalData)
        state = "approved"
        logger.Info("Order approved", "by", approvalData.ApprovedBy)
    })

    selector.AddReceive(rejectionChannel, func(c workflow.ReceiveChannel, more bool) {
        var reason string
        c.Receive(ctx, &reason)
        state = "rejected"
        logger.Info("Order rejected", "reason", reason)
    })

    selector.Select(ctx)

    if state == "approved" {
        // Process approved order
        return workflow.ExecuteActivity(ctx, ProcessOrder, orderID).Get(ctx, nil)
    } else {
        // Handle rejection
        return workflow.ExecuteActivity(ctx, NotifyRejection, orderID).Get(ctx, nil)
    }
}

type ApprovalData struct {
    ApprovedBy string
    Timestamp  int64
    Comments   string
}

Queries

What is a Query?

A query is a synchronous read operation on a workflow’s current state. Queries are used for:
  • Status checks: Get current workflow state
  • Progress monitoring: Check completion percentage
  • Data retrieval: Read accumulated results
  • Debugging: Inspect internal workflow state

How Queries Work

Queries are Read-Only: Query handlers cannot modify workflow state or schedule activities. They must be side-effect free.

Query Type Definition

type WorkflowQuery struct {
    QueryType string  // Name of the query
    QueryArgs []byte  // Query parameters
}

type WorkflowQueryResult struct {
    ResultType   *QueryResultType  // ANSWERED or FAILED
    Answer       []byte            // Query result data
    ErrorMessage string            // Error if failed
}

type QueryResultType int32

const (
    QueryResultTypeAnswered QueryResultType = iota  // Query succeeded
    QueryResultTypeFailed                           // Query failed
)

Query Consistency Levels

type QueryConsistencyLevel int32

const (
    QueryConsistencyLevelEventual QueryConsistencyLevel = iota  // Eventually consistent
    QueryConsistencyLevelStrong                                 // Strongly consistent
)
  • Eventual: May return stale data, lower latency
  • Strong: Returns latest data, higher latency

Code Examples

func OrderWorkflow(ctx workflow.Context, orderID string) error {
    var totalAmount float64 = 0
    var items []OrderItem
    var status string = "processing"

    // Register query handlers
    err := workflow.SetQueryHandler(ctx, "get-status", func() (string, error) {
        return status, nil
    })
    if err != nil {
        return err
    }

    err = workflow.SetQueryHandler(ctx, "get-total", func() (float64, error) {
        return totalAmount, nil
    })
    if err != nil {
        return err
    }

    err = workflow.SetQueryHandler(ctx, "get-items", func() ([]OrderItem, error) {
        return items, nil
    })
    if err != nil {
        return err
    }

    // Parameterized query
    err = workflow.SetQueryHandler(ctx, "get-item", func(sku string) (OrderItem, error) {
        for _, item := range items {
            if item.SKU == sku {
                return item, nil
            }
        }
        return OrderItem{}, fmt.Errorf("item not found: %s", sku)
    })
    if err != nil {
        return err
    }

    // Workflow logic updates state
    status = "completed"
    totalAmount = 99.99

    return nil
}

Signal and Query Event Attributes

WorkflowExecutionSignaledEventAttributes

type WorkflowExecutionSignaledEventAttributes struct {
    SignalName string  // Name of the signal
    Input      []byte  // Signal payload
    Identity   string  // Who sent the signal
    RequestID  string  // Idempotency key
}

QueryWorkflowRequest

type QueryWorkflowRequest struct {
    Domain                string
    Execution             *WorkflowExecution
    Query                 *WorkflowQuery
    QueryRejectCondition  *QueryRejectCondition
    QueryConsistencyLevel *QueryConsistencyLevel
}

QueryRejectCondition

type QueryRejectCondition int32

const (
    QueryRejectConditionNotOpen           QueryRejectCondition = iota  // Reject if workflow closed
    QueryRejectConditionNotCompletedCleanly                            // Reject if not completed successfully
)

Advanced Patterns

1. Multi-Signal Coordination

func ApprovalWorkflow(ctx workflow.Context, docID string) error {
    approvals := make(map[string]bool)
    requiredApprovers := []string{"manager", "finance", "legal"}

    approvalCh := workflow.GetSignalChannel(ctx, "approve")

    // Wait for all approvals
    for len(approvals) < len(requiredApprovers) {
        var approver string
        approvalCh.Receive(ctx, &approver)
        approvals[approver] = true
    }

    // All approvals received
    return workflow.ExecuteActivity(ctx, ProcessApprovedDocument, docID).Get(ctx, nil)
}

2. Cancellation via Signal

func LongRunningWorkflow(ctx workflow.Context) error {
    cancelCh := workflow.GetSignalChannel(ctx, "cancel")

    // Start long operation
    activityCtx, cancelActivity := workflow.WithCancel(ctx)
    future := workflow.ExecuteActivity(activityCtx, LongActivity)

    // Wait for completion or cancellation
    selector := workflow.NewSelector(ctx)

    selector.AddFuture(future, func(f workflow.Future) {
        // Activity completed
    })

    selector.AddReceive(cancelCh, func(c workflow.ReceiveChannel, more bool) {
        var reason string
        c.Receive(ctx, &reason)
        cancelActivity()  // Cancel the activity
    })

    selector.Select(ctx)
    return nil
}

3. Progress Tracking with Queries

func BatchProcessingWorkflow(ctx workflow.Context, items []string) error {
    var processed int
    var failed int
    var currentItem string

    // Register progress query
    workflow.SetQueryHandler(ctx, "progress", func() (map[string]interface{}, error) {
        total := len(items)
        return map[string]interface{}{
            "total":       total,
            "processed":   processed,
            "failed":      failed,
            "remaining":   total - processed - failed,
            "currentItem": currentItem,
            "percentage":  float64(processed) / float64(total) * 100,
        }, nil
    })

    // Process items
    for _, item := range items {
        currentItem = item
        err := workflow.ExecuteActivity(ctx, ProcessItem, item).Get(ctx, nil)
        if err != nil {
            failed++
        } else {
            processed++
        }
    }

    return nil
}

4. Dynamic Signal Handlers

func DynamicWorkflow(ctx workflow.Context) error {
    signals := make(map[string]interface{})

    // Register catch-all signal handler
    workflow.SetSignalHandler(ctx, func(signalName string, input []byte) {
        signals[signalName] = string(input)
    })

    // Query to see received signals
    workflow.SetQueryHandler(ctx, "signals", func() (map[string]interface{}, error) {
        return signals, nil
    })

    // Workflow logic
    workflow.Sleep(ctx, time.Hour)
    return nil
}

5. Signal Buffering

func BufferedSignalWorkflow(ctx workflow.Context) error {
    buffer := make([]SignalData, 0)
    signalCh := workflow.GetSignalChannel(ctx, "data")

    // Buffer signals
    for {
        selector := workflow.NewSelector(ctx)

        selector.AddReceive(signalCh, func(c workflow.ReceiveChannel, more bool) {
            var data SignalData
            c.Receive(ctx, &data)
            buffer = append(buffer, data)
        })

        // Process buffer when it reaches threshold
        if len(buffer) >= 100 {
            err := workflow.ExecuteActivity(ctx, ProcessBatch, buffer).Get(ctx, nil)
            if err != nil {
                return err
            }
            buffer = buffer[:0]  // Clear buffer
        }

        selector.Select(ctx)
    }
}

Best Practices

Signals

// Use typed signal data
type ApprovalSignal struct {
    ApprovedBy string
    Timestamp  time.Time
    Comments   string
}

// Descriptive signal names
workflow.GetSignalChannel(ctx, "approve-order")
workflow.GetSignalChannel(ctx, "reject-order")
workflow.GetSignalChannel(ctx, "cancel-order")

Queries

// Read-only query handlers
workflow.SetQueryHandler(ctx, "status", func() (string, error) {
    return currentStatus, nil  // ✓ Just returns state
})

// Return copies of mutable data
workflow.SetQueryHandler(ctx, "items", func() ([]Item, error) {
    return append([]Item{}, items...), nil  // ✓ Return copy
})

General

  1. Signal Naming: Use descriptive, action-oriented names
  2. Query Naming: Use noun-based or question-style names
  3. Idempotency: Design signal handlers to be idempotent
  4. Validation: Validate signal/query inputs
  5. Documentation: Document expected signal/query types

Common Pitfalls

Avoid These Mistakes:
  1. Modifying state in queries - Queries must be read-only
  2. Blocking on signals indefinitely - Use timeouts with selectors
  3. Large signal payloads - Use activities for large data transfers
  4. Not handling signal ordering - Signals may arrive in any order
  5. Querying closed workflows - Check workflow status first

CLI Examples

# Send signal
cadence workflow signal \
  --workflow_id order-12345 \
  --name approve-order \
  --input '{"approvedBy": "[email protected]"}'

# Query workflow
cadence workflow query \
  --workflow_id order-12345 \
  --query_type get-status

# Query with consistency
cadence workflow query \
  --workflow_id order-12345 \
  --query_type get-total \
  --query_consistency_level strong

# Signal with start
cadence workflow start \
  --workflow_id order-12345 \
  --tasklist order-processing \
  --workflow_type OrderWorkflow \
  --signal_name new-item \
  --signal_input '{"sku": "ABC123"}'
  • Workflows - Host signal and query handlers
  • Activities - Cannot be signaled or queried directly
  • Workers - Execute signal and query handlers
  • Domains - Scope for workflow execution

Further Reading

Build docs developers (and LLMs) love