Overview
While workflows execute autonomously, you often need to interact with them while they’re running. Cadence provides two mechanisms for this:
- Signals: Send data to a running workflow (write operation)
- Queries: Read workflow state without modifying it (read operation)
Signals and queries enable real-time interaction with workflows without breaking determinism. They’re essential for building interactive, event-driven applications.
Signals
What is a Signal?
A signal is an asynchronous message sent to a running workflow execution. Signals are used for:
- External events: User actions, system events, webhooks
- State changes: Approval/rejection, cancellation requests
- Data updates: New information becoming available
- Coordination: Synchronizing between workflows
How Signals Work
Signal Type Definition
type SignalExternalWorkflowExecutionInitiatedEventAttributes struct {
DecisionTaskCompletedEventID int64
Domain string
WorkflowExecution *WorkflowExecution
SignalName string // Name of the signal
Input []byte // Signal payload
Control []byte
ChildWorkflowOnly bool
}
Code Examples
package main
import (
"go.uber.org/cadence/workflow"
)
// Workflow with signal handler
func OrderWorkflow(ctx workflow.Context, orderID string) error {
logger := workflow.GetLogger(ctx)
var state string = "pending"
// Create signal channel
approvalChannel := workflow.GetSignalChannel(ctx, "approve-order")
rejectionChannel := workflow.GetSignalChannel(ctx, "reject-order")
// Wait for approval or rejection
selector := workflow.NewSelector(ctx)
selector.AddReceive(approvalChannel, func(c workflow.ReceiveChannel, more bool) {
var approvalData ApprovalData
c.Receive(ctx, &approvalData)
state = "approved"
logger.Info("Order approved", "by", approvalData.ApprovedBy)
})
selector.AddReceive(rejectionChannel, func(c workflow.ReceiveChannel, more bool) {
var reason string
c.Receive(ctx, &reason)
state = "rejected"
logger.Info("Order rejected", "reason", reason)
})
selector.Select(ctx)
if state == "approved" {
// Process approved order
return workflow.ExecuteActivity(ctx, ProcessOrder, orderID).Get(ctx, nil)
} else {
// Handle rejection
return workflow.ExecuteActivity(ctx, NotifyRejection, orderID).Get(ctx, nil)
}
}
type ApprovalData struct {
ApprovedBy string
Timestamp int64
Comments string
}
func sendApproval() {
c, _ := client.NewClient(client.Options{
Domain: "my-domain",
})
// Send signal to workflow
err := c.SignalWorkflow(
context.Background(),
"order-12345", // Workflow ID
"", // Run ID (empty = current run)
"approve-order", // Signal name
ApprovalData{
ApprovedBy: "[email protected]",
Timestamp: time.Now().Unix(),
Comments: "Looks good",
},
)
if err != nil {
panic(err)
}
}
// Start workflow if not running, or signal if already running
func signalWithStart() {
c, _ := client.NewClient(client.Options{
Domain: "my-domain",
})
workflowOptions := client.StartWorkflowOptions{
ID: "order-12345",
TaskList: "order-processing",
ExecutionStartToCloseTimeout: time.Hour * 24,
}
signalName := "new-item"
signalArg := OrderItem{SKU: "ABC123", Quantity: 5}
we, err := c.SignalWithStartWorkflow(
context.Background(),
"order-12345",
signalName,
signalArg,
workflowOptions,
OrderWorkflow,
"order-12345", // Workflow args
)
if err != nil {
panic(err)
}
}
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.SignalMethod;
import com.uber.cadence.workflow.WorkflowMethod;
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(String orderId);
@SignalMethod
void approveOrder(ApprovalData approval);
@SignalMethod
void rejectOrder(String reason);
}
public class OrderWorkflowImpl implements OrderWorkflow {
private String state = "pending";
private ApprovalData approval;
private String rejectionReason;
@Override
public void processOrder(String orderId) {
// Wait for signal
Workflow.await(() -> !state.equals("pending"));
if (state.equals("approved")) {
// Process order
activities.processOrder(orderId);
} else {
// Handle rejection
activities.notifyRejection(orderId, rejectionReason);
}
}
@Override
public void approveOrder(ApprovalData approval) {
this.approval = approval;
this.state = "approved";
}
@Override
public void rejectOrder(String reason) {
this.rejectionReason = reason;
this.state = "rejected";
}
}
// Sending signal
WorkflowClient client = WorkflowClient.newInstance(...);
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
"order-12345"
);
workflow.approveOrder(new ApprovalData("manager", "Approved"));
Queries
What is a Query?
A query is a synchronous read operation on a workflow’s current state. Queries are used for:
- Status checks: Get current workflow state
- Progress monitoring: Check completion percentage
- Data retrieval: Read accumulated results
- Debugging: Inspect internal workflow state
How Queries Work
Queries are Read-Only: Query handlers cannot modify workflow state or schedule activities. They must be side-effect free.
Query Type Definition
type WorkflowQuery struct {
QueryType string // Name of the query
QueryArgs []byte // Query parameters
}
type WorkflowQueryResult struct {
ResultType *QueryResultType // ANSWERED or FAILED
Answer []byte // Query result data
ErrorMessage string // Error if failed
}
type QueryResultType int32
const (
QueryResultTypeAnswered QueryResultType = iota // Query succeeded
QueryResultTypeFailed // Query failed
)
Query Consistency Levels
type QueryConsistencyLevel int32
const (
QueryConsistencyLevelEventual QueryConsistencyLevel = iota // Eventually consistent
QueryConsistencyLevelStrong // Strongly consistent
)
- Eventual: May return stale data, lower latency
- Strong: Returns latest data, higher latency
Code Examples
func OrderWorkflow(ctx workflow.Context, orderID string) error {
var totalAmount float64 = 0
var items []OrderItem
var status string = "processing"
// Register query handlers
err := workflow.SetQueryHandler(ctx, "get-status", func() (string, error) {
return status, nil
})
if err != nil {
return err
}
err = workflow.SetQueryHandler(ctx, "get-total", func() (float64, error) {
return totalAmount, nil
})
if err != nil {
return err
}
err = workflow.SetQueryHandler(ctx, "get-items", func() ([]OrderItem, error) {
return items, nil
})
if err != nil {
return err
}
// Parameterized query
err = workflow.SetQueryHandler(ctx, "get-item", func(sku string) (OrderItem, error) {
for _, item := range items {
if item.SKU == sku {
return item, nil
}
}
return OrderItem{}, fmt.Errorf("item not found: %s", sku)
})
if err != nil {
return err
}
// Workflow logic updates state
status = "completed"
totalAmount = 99.99
return nil
}
func queryWorkflow() {
c, _ := client.NewClient(client.Options{
Domain: "my-domain",
})
// Query workflow status
var status string
value, err := c.QueryWorkflow(
context.Background(),
"order-12345", // Workflow ID
"", // Run ID
"get-status", // Query type
)
if err != nil {
panic(err)
}
err = value.Get(&status)
if err != nil {
panic(err)
}
fmt.Printf("Status: %s\n", status)
// Query with strong consistency
queryOptions := client.QueryWorkflowWithOptionsRequest{
WorkflowID: "order-12345",
QueryType: "get-total",
QueryConsistencyLevel: client.QueryConsistencyLevelStrong,
}
var total float64
value, err = c.QueryWorkflowWithOptions(context.Background(), &queryOptions)
if err != nil {
panic(err)
}
value.Get(&total)
fmt.Printf("Total: $%.2f\n", total)
// Parameterized query
var item OrderItem
value, err = c.QueryWorkflow(
context.Background(),
"order-12345",
"",
"get-item",
"ABC123", // Query parameter
)
if err != nil {
panic(err)
}
value.Get(&item)
fmt.Printf("Item: %+v\n", item)
}
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(String orderId);
@QueryMethod
String getStatus();
@QueryMethod
double getTotal();
@QueryMethod
List<OrderItem> getItems();
}
public class OrderWorkflowImpl implements OrderWorkflow {
private String status = "processing";
private double totalAmount = 0;
private List<OrderItem> items = new ArrayList<>();
@Override
public void processOrder(String orderId) {
// Workflow logic
status = "completed";
totalAmount = 99.99;
}
@Override
public String getStatus() {
return status;
}
@Override
public double getTotal() {
return totalAmount;
}
@Override
public List<OrderItem> getItems() {
return new ArrayList<>(items);
}
}
// Querying workflow
WorkflowClient client = WorkflowClient.newInstance(...);
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
"order-12345"
);
String status = workflow.getStatus();
double total = workflow.getTotal();
Signal and Query Event Attributes
WorkflowExecutionSignaledEventAttributes
type WorkflowExecutionSignaledEventAttributes struct {
SignalName string // Name of the signal
Input []byte // Signal payload
Identity string // Who sent the signal
RequestID string // Idempotency key
}
QueryWorkflowRequest
type QueryWorkflowRequest struct {
Domain string
Execution *WorkflowExecution
Query *WorkflowQuery
QueryRejectCondition *QueryRejectCondition
QueryConsistencyLevel *QueryConsistencyLevel
}
QueryRejectCondition
type QueryRejectCondition int32
const (
QueryRejectConditionNotOpen QueryRejectCondition = iota // Reject if workflow closed
QueryRejectConditionNotCompletedCleanly // Reject if not completed successfully
)
Advanced Patterns
1. Multi-Signal Coordination
func ApprovalWorkflow(ctx workflow.Context, docID string) error {
approvals := make(map[string]bool)
requiredApprovers := []string{"manager", "finance", "legal"}
approvalCh := workflow.GetSignalChannel(ctx, "approve")
// Wait for all approvals
for len(approvals) < len(requiredApprovers) {
var approver string
approvalCh.Receive(ctx, &approver)
approvals[approver] = true
}
// All approvals received
return workflow.ExecuteActivity(ctx, ProcessApprovedDocument, docID).Get(ctx, nil)
}
2. Cancellation via Signal
func LongRunningWorkflow(ctx workflow.Context) error {
cancelCh := workflow.GetSignalChannel(ctx, "cancel")
// Start long operation
activityCtx, cancelActivity := workflow.WithCancel(ctx)
future := workflow.ExecuteActivity(activityCtx, LongActivity)
// Wait for completion or cancellation
selector := workflow.NewSelector(ctx)
selector.AddFuture(future, func(f workflow.Future) {
// Activity completed
})
selector.AddReceive(cancelCh, func(c workflow.ReceiveChannel, more bool) {
var reason string
c.Receive(ctx, &reason)
cancelActivity() // Cancel the activity
})
selector.Select(ctx)
return nil
}
3. Progress Tracking with Queries
func BatchProcessingWorkflow(ctx workflow.Context, items []string) error {
var processed int
var failed int
var currentItem string
// Register progress query
workflow.SetQueryHandler(ctx, "progress", func() (map[string]interface{}, error) {
total := len(items)
return map[string]interface{}{
"total": total,
"processed": processed,
"failed": failed,
"remaining": total - processed - failed,
"currentItem": currentItem,
"percentage": float64(processed) / float64(total) * 100,
}, nil
})
// Process items
for _, item := range items {
currentItem = item
err := workflow.ExecuteActivity(ctx, ProcessItem, item).Get(ctx, nil)
if err != nil {
failed++
} else {
processed++
}
}
return nil
}
4. Dynamic Signal Handlers
func DynamicWorkflow(ctx workflow.Context) error {
signals := make(map[string]interface{})
// Register catch-all signal handler
workflow.SetSignalHandler(ctx, func(signalName string, input []byte) {
signals[signalName] = string(input)
})
// Query to see received signals
workflow.SetQueryHandler(ctx, "signals", func() (map[string]interface{}, error) {
return signals, nil
})
// Workflow logic
workflow.Sleep(ctx, time.Hour)
return nil
}
5. Signal Buffering
func BufferedSignalWorkflow(ctx workflow.Context) error {
buffer := make([]SignalData, 0)
signalCh := workflow.GetSignalChannel(ctx, "data")
// Buffer signals
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalCh, func(c workflow.ReceiveChannel, more bool) {
var data SignalData
c.Receive(ctx, &data)
buffer = append(buffer, data)
})
// Process buffer when it reaches threshold
if len(buffer) >= 100 {
err := workflow.ExecuteActivity(ctx, ProcessBatch, buffer).Get(ctx, nil)
if err != nil {
return err
}
buffer = buffer[:0] // Clear buffer
}
selector.Select(ctx)
}
}
Best Practices
Signals
// Use typed signal data
type ApprovalSignal struct {
ApprovedBy string
Timestamp time.Time
Comments string
}
// Descriptive signal names
workflow.GetSignalChannel(ctx, "approve-order")
workflow.GetSignalChannel(ctx, "reject-order")
workflow.GetSignalChannel(ctx, "cancel-order")
Queries
// Read-only query handlers
workflow.SetQueryHandler(ctx, "status", func() (string, error) {
return currentStatus, nil // ✓ Just returns state
})
// Return copies of mutable data
workflow.SetQueryHandler(ctx, "items", func() ([]Item, error) {
return append([]Item{}, items...), nil // ✓ Return copy
})
General
- Signal Naming: Use descriptive, action-oriented names
- Query Naming: Use noun-based or question-style names
- Idempotency: Design signal handlers to be idempotent
- Validation: Validate signal/query inputs
- Documentation: Document expected signal/query types
Common Pitfalls
Avoid These Mistakes:
- Modifying state in queries - Queries must be read-only
- Blocking on signals indefinitely - Use timeouts with selectors
- Large signal payloads - Use activities for large data transfers
- Not handling signal ordering - Signals may arrive in any order
- Querying closed workflows - Check workflow status first
CLI Examples
# Send signal
cadence workflow signal \
--workflow_id order-12345 \
--name approve-order \
--input '{"approvedBy": "[email protected]"}'
# Query workflow
cadence workflow query \
--workflow_id order-12345 \
--query_type get-status
# Query with consistency
cadence workflow query \
--workflow_id order-12345 \
--query_type get-total \
--query_consistency_level strong
# Signal with start
cadence workflow start \
--workflow_id order-12345 \
--tasklist order-processing \
--workflow_type OrderWorkflow \
--signal_name new-item \
--signal_input '{"sku": "ABC123"}'
- Workflows - Host signal and query handlers
- Activities - Cannot be signaled or queried directly
- Workers - Execute signal and query handlers
- Domains - Scope for workflow execution
Further Reading