Workflows
Workflows in Solace Agent Mesh provide prescriptive orchestration for multi-agent systems. Unlike the dynamic Orchestrator , workflows define deterministic execution graphs that guarantee consistent behavior, proper error handling, and full auditability.
Workflow Fundamentals
A workflow is a Directed Acyclic Graph (DAG) of nodes where:
Nodes represent operations (agent calls, conditional branches, loops)
Edges define dependencies and execution order
Data flows from node outputs to node inputs via template expressions
name : research_and_report_workflow
namespace : acme/ai
workflow :
description : Research a topic and generate a comprehensive report
version : 1.0.0
nodes :
- id : research
type : agent
agent_name : research_agent
input :
query : "{{workflow.input.topic}}"
- id : analyze
type : agent
agent_name : analysis_agent
depends_on : [ research ]
input :
data : "{{research.output.findings}}"
- id : write_report
type : agent
agent_name : writing_agent
depends_on : [ analyze ]
input :
content : "{{analyze.output.insights}}"
format : "markdown"
output_mapping :
report : "{{write_report.output.document}}"
Workflow Architecture
Workflows are implemented as special agents that register in the mesh:
# From: src/solace_agent_mesh/workflow/app.py:592-687
class WorkflowApp ( App ):
"""
Custom App class for workflow orchestration.
Workflows register as agents on the mesh, allowing:
- Gateway invocation (users can call workflows directly)
- Agent-to-agent calls (workflows can invoke other workflows)
- Discovery via agent cards
- Standard A2A message handling
"""
def __init__ ( self , app_info : Dict[ str , Any], ** kwargs ):
# Validate workflow definition
app_config = WorkflowAppConfig.model_validate_and_clean(
app_info.get( "app_config" , {})
)
# Generate A2A subscriptions
subscriptions = self ._generate_subscriptions(
namespace,
workflow_name
)
Key Components
DAG Executor Manages node execution order, dependency resolution, and parallel execution
Agent Caller Handles agent invocations via A2A protocol with result tracking
Workflow State Tracks execution progress, node outputs, and control flow state
Event Publisher Streams execution events for monitoring and visualization
Node Types
Agent Node
Call another agent to perform work:
- id : sentiment_analysis
type : agent
agent_name : nlp_agent
depends_on : [ data_collection ]
input :
text : "{{data_collection.output.reviews}}"
analysis_type : "sentiment"
instruction : |
Analyze customer sentiment from the provided reviews.
Focus on overall tone and key themes.
timeout : 2m
retry_strategy :
limit : 3
retry_policy : OnFailure
backoff :
duration : 5s
factor : 2.0
Name of the agent to invoke
Input mapping (supports template expressions)
Optional guidance text sent to the agent (supports templates)
Conditional execution clause (Argo-style). Node skips if false.
Node-specific timeout (e.g., ”30s”, “5m”, “1h”)
Retry configuration for this node
Switch Node
Multi-way conditional branching:
- id : route_by_category
type : switch
depends_on : [ classify ]
cases :
- condition : "{{classify.output.category}} == 'technical'"
node : technical_specialist
- condition : "{{classify.output.category}} == 'billing'"
node : billing_specialist
- condition : "{{classify.output.category}} == 'sales'"
node : sales_specialist
default : general_support
- id : technical_specialist
type : agent
agent_name : tech_support_agent
depends_on : [ route_by_category ]
# ... agent config
- id : billing_specialist
type : agent
agent_name : billing_agent
depends_on : [ route_by_category ]
# ... agent config
Switch nodes evaluate cases in order. First matching condition wins. Non-selected branches are automatically skipped.
Loop Node
Repeat a node while a condition is true:
- id : poll_until_ready
type : loop
depends_on : [ start_job ]
condition : "{{check_status.output.state}} != 'completed'"
max_iterations : 20
delay : 10s
node : check_status
- id : check_status
type : agent
agent_name : status_checker
depends_on : [ poll_until_ready ]
input :
job_id : "{{start_job.output.job_id}}"
From src/solace_agent_mesh/workflow/dag_executor.py:633-767:
async def _execute_loop_node (
self ,
node : LoopNode,
workflow_state : WorkflowExecutionState,
workflow_context : WorkflowExecutionContext,
):
"""
Execute loop node for while-loop iteration.
On first iteration (iteration=0), always runs (do-while behavior).
Then checks condition before subsequent iterations.
"""
iteration = workflow_state.loop_iterations.get(node.id, 0 )
# Check max iterations
if iteration >= node.max_iterations:
workflow_state.completed_nodes[node.id] = "loop_max_iterations"
return
# Evaluate condition (skip on first iteration)
if iteration > 0 :
should_continue = evaluate_condition(
node.condition,
workflow_state
)
if not should_continue:
workflow_state.completed_nodes[node.id] = "loop_condition_false"
return
# Apply delay between iterations
if node.delay and iteration > 0 :
await asyncio.sleep(parse_duration(node.delay))
# Execute inner node
await self ._execute_agent_node(inner_node, ... )
Loop nodes have a max_iterations safety limit (default: 100). Set this appropriately for your use case.
Map Node
Parallel iteration over a collection:
- id : process_all_files
type : map
depends_on : [ list_files ]
items : "{{list_files.output.files}}" # Array of filenames
node : process_single_file
max_items : 50
concurrency_limit : 5
- id : process_single_file
type : agent
agent_name : file_processor
depends_on : [ process_all_files ]
input :
filename : "{{_map_item}}"
index : "{{_map_index}}"
Special variables:
{{_map_item}}: Current item from the array
{{_map_index}}: Zero-based index of current item
Argo-compatible aliases:
# SAM syntax
items : "{{previous_node.output.array}}"
# Argo syntax (also supported)
withParam : "{{previous_node.output.array}}"
# OR
withItems :
- item1
- item2
- item3
Workflow Node
Call another workflow as a sub-workflow:
- id : run_analysis_workflow
type : workflow
workflow_name : data_analysis_workflow
depends_on : [ data_prep ]
input :
dataset : "{{data_prep.output.cleaned_data}}"
parameters :
method : "regression"
timeout : 10m
Workflows can nest up to max_call_depth levels (default: 10) to prevent infinite recursion.
Template Expressions
Workflows use double-brace syntax for dynamic values:
workflow :
input_schema :
type : object
properties :
topic :
type : string
language :
type : string
default : "en"
nodes :
- id : research
type : agent
agent_name : research_agent
input :
query : "{{workflow.input.topic}}"
lang : "{{workflow.input.language}}"
Node Output References
- id : analyze
type : agent
agent_name : analyzer
input :
# Reference previous node's output
data : "{{research.output.findings}}"
# Navigate nested fields
confidence : "{{research.output.metadata.confidence}}"
Operators
Coalesce (first non-null value):
input :
value :
coalesce :
- "{{optional_node.output.value}}"
- "{{fallback_node.output.value}}"
- "default_value"
Concat (string concatenation):
input :
message :
concat :
- "Analysis for "
- "{{workflow.input.topic}}"
- ": "
- "{{summary.output.text}}"
Argo-Compatible Aliases
For Argo Workflows users, SAM supports familiar syntax:
# Argo syntax
workflow :
parameters : # SAM: input
- name : topic
value : "AI agents"
nodes :
- id : research
type : agent
agent_name : research_agent
arguments : # SAM: input
query : "{{workflow.parameters.topic}}" # SAM: workflow.input.topic
From src/solace_agent_mesh/workflow/flow_control/conditional.py:
def _apply_template_aliases ( template : str ) -> str :
"""
Apply Argo-compatible template aliases.
Transformations:
- {{item}} → {{_map_item}}
- {{workflow.parameters.x}} → {{workflow.input.x}}
"""
# Map item alias
if template == " {{ item }} " :
return " {{ _map_item }} "
# Workflow parameters → input
template = template.replace(
" {{ workflow.parameters." ,
" {{ workflow.input."
)
return template
Control Flow
Conditional Execution
Use when clauses for conditional nodes:
- id : high_priority_handler
type : agent
agent_name : priority_agent
when : "{{classify.output.priority}} == 'high'"
depends_on : [ classify ]
input :
request : "{{workflow.input.request}}"
Supported operators:
==, !=: Equality
>, <, >=, <=: Comparison
and, or, not: Boolean logic
in: Membership (e.g., {{value}} in ['a', 'b'])
Parallel Execution
Nodes with no dependency on each other run in parallel:
nodes :
# These three nodes run in parallel
- id : web_search
type : agent
agent_name : search_agent
depends_on : [ start ]
- id : database_query
type : agent
agent_name : db_agent
depends_on : [ start ]
- id : api_call
type : agent
agent_name : api_agent
depends_on : [ start ]
# This node waits for all three to complete
- id : synthesis
type : agent
agent_name : synthesis_agent
depends_on : [ web_search , database_query , api_call ]
From src/solace_agent_mesh/workflow/dag_executor.py:266-300:
# Multiple nodes ready = implicit parallel execution
if len (next_nodes) > 1 :
parallel_group_id = f "implicit_parallel_ { uuid.uuid4().hex[: 8 ] } "
# Assign each node to separate branch
for branch_idx, node_id in enumerate (next_nodes):
node_parallel_info[node_id] = (parallel_group_id, branch_idx)
workflow_state.parallel_branch_assignments[parallel_group_id][node_id] = branch_idx
log.info(
f "Implicit parallel execution: { len (next_nodes) } nodes, "
f "group= { parallel_group_id } "
)
# Execute all ready nodes
for node_id in next_nodes:
await self .execute_node(node_id, workflow_state, workflow_context)
Error Handling
Retry Strategy:
workflow :
# Default retry for all nodes
retry_strategy :
limit : 3
retry_policy : OnFailure
backoff :
duration : 1s
factor : 2.0
max_duration : 30s
nodes :
- id : flaky_api_call
type : agent
agent_name : api_agent
# Override with node-specific retry
retry_strategy :
limit : 5
retry_policy : Always
When to retry: Always, OnFailure, OnError
Exponential backoff configuration
Fail Fast:
workflow :
fail_fast : true # Stop scheduling new nodes on first failure (default)
# OR
fail_fast : false # Continue running independent nodes after failure
Exit Handlers
Run cleanup or notification logic on workflow completion:
workflow :
on_exit :
always : cleanup_node
on_success : success_notification
on_failure : error_notification
on_cancel : cancel_notification
nodes :
# ... main workflow nodes ...
- id : cleanup_node
type : agent
agent_name : cleanup_agent
input :
session_id : "{{workflow.input.session_id}}"
- id : success_notification
type : agent
agent_name : notifier
input :
message : "Workflow completed successfully"
- id : error_notification
type : agent
agent_name : notifier
input :
message : "Workflow failed: {{workflow.error.message}}"
Workflow Execution
Lifecycle States
Execution Context
From src/solace_agent_mesh/workflow/workflow_execution_context.py:
class WorkflowExecutionState ( BaseModel ):
"""Tracks workflow execution progress."""
execution_id: str # Unique workflow run ID
# Node tracking
completed_nodes: Dict[ str , Any] = {} # node_id → result
pending_nodes: List[ str ] = [] # Currently executing
skipped_nodes: Dict[ str , str ] = {} # Skipped due to conditionals
# Data flow
node_outputs: Dict[ str , Dict] = {} # Cached outputs for templates
# Control flow state
loop_iterations: Dict[ str , int ] = {} # Loop counters
active_branches: Dict[ str , List] = {} # Fork/Map tracking
parallel_branch_assignments: Dict = {}
# Error tracking
error_state: Optional[Dict] = None
# Extensible metadata
metadata: Dict[ str , Any] = {}
Event Streaming
Workflows publish execution events for monitoring:
Node Start:
{
"type" : "workflow_node_execution_start" ,
"node_id" : "research" ,
"node_type" : "agent" ,
"agent_name" : "research_agent" ,
"sub_task_id" : "wf_exec123_research_xyz" ,
"timestamp" : "2026-03-04T10:15:30Z"
}
Node Result:
{
"type" : "workflow_node_execution_result" ,
"node_id" : "research" ,
"status" : "success" ,
"output_artifact_ref" : {
"name" : "research_findings.json" ,
"version" : 1
},
"timestamp" : "2026-03-04T10:16:45Z"
}
Map Progress:
{
"type" : "workflow_map_progress" ,
"node_id" : "process_all_files" ,
"total_items" : 20 ,
"completed_items" : 8 ,
"status" : "in-progress"
}
Complete Example
Here’s a full workflow for research, analysis, and reporting:
name : comprehensive_research_workflow
namespace : acme/ai
log_level : info
workflow :
description : |
Multi-stage research workflow:
1. Gather information from multiple sources
2. Analyze and synthesize findings
3. Generate comprehensive report
version : 2.1.0
input_schema :
type : object
properties :
topic :
type : string
description : Research topic
depth :
type : string
enum : [ "basic" , "comprehensive" ]
default : "basic"
format :
type : string
enum : [ "markdown" , "pdf" , "html" ]
default : "markdown"
required : [ "topic" ]
output_schema :
type : object
properties :
report :
type : string
description : Final report artifact reference
metadata :
type : object
nodes :
# Parallel research from multiple sources
- id : web_research
type : agent
agent_name : web_research_agent
input :
query : "{{workflow.input.topic}}"
max_results : 20
timeout : 3m
- id : academic_research
type : agent
agent_name : academic_agent
when : "{{workflow.input.depth}} == 'comprehensive'"
input :
query : "{{workflow.input.topic}}"
databases : [ "arxiv" , "pubmed" ]
timeout : 5m
- id : database_query
type : agent
agent_name : internal_db_agent
input :
topic : "{{workflow.input.topic}}"
# Synthesize research findings
- id : synthesize
type : agent
agent_name : synthesis_agent
depends_on : [ web_research , academic_research , database_query ]
input :
sources :
web : "{{web_research.output.findings}}"
academic :
coalesce :
- "{{academic_research.output.papers}}"
- []
internal : "{{database_query.output.results}}"
# Quality check
- id : quality_check
type : agent
agent_name : qa_agent
depends_on : [ synthesize ]
input :
content : "{{synthesize.output.synthesis}}"
# Conditional re-synthesis if quality low
- id : needs_improvement
type : switch
depends_on : [ quality_check ]
cases :
- condition : "{{quality_check.output.score}} < 0.7"
node : improve_synthesis
default : generate_report
- id : improve_synthesis
type : agent
agent_name : synthesis_agent
depends_on : [ needs_improvement ]
input :
sources : "{{synthesize.output.sources}}"
feedback : "{{quality_check.output.feedback}}"
previous_attempt : "{{synthesize.output.synthesis}}"
# Generate final report
- id : generate_report
type : agent
agent_name : report_generator
depends_on : [ needs_improvement ]
input :
content :
coalesce :
- "{{improve_synthesis.output.synthesis}}"
- "{{synthesize.output.synthesis}}"
format : "{{workflow.input.format}}"
topic : "{{workflow.input.topic}}"
# Generate visualizations in parallel
- id : create_charts
type : agent
agent_name : visualization_agent
depends_on : [ generate_report ]
input :
data : "{{generate_report.output.data_points}}"
- id : create_summary
type : agent
agent_name : summarization_agent
depends_on : [ generate_report ]
input :
full_report : "{{generate_report.output.report}}"
max_length : 500
# Final assembly
- id : assemble_final
type : agent
agent_name : document_assembler
depends_on : [ create_charts , create_summary ]
input :
report : "{{generate_report.output.report}}"
charts : "{{create_charts.output.visualizations}}"
summary : "{{create_summary.output.summary}}"
output_mapping :
report : "{{assemble_final.output.final_document}}"
metadata :
sources_count :
coalesce :
- "{{academic_research.output.count}}"
- 0
quality_score : "{{quality_check.output.score}}"
generated_at : "{{assemble_final.output.timestamp}}"
# Workflow-level configuration
fail_fast : true
max_call_depth : 5
retry_strategy :
limit : 2
retry_policy : OnFailure
backoff :
duration : 10s
factor : 2.0
on_exit :
always : cleanup
on_failure : error_notification
session_db :
type : postgres
connection_string : ${DATABASE_URL}
artifact_service :
type : s3
bucket : workflow-artifacts
max_workflow_execution_time_seconds : 1800
default_node_timeout_seconds : 300
Best Practices
Agent operations should be idempotent where possible
Use artifact versioning for reproducibility
Store intermediate results for retry recovery
Avoid side effects in conditional expressions
Set appropriate timeouts for each node type
Use retry strategies judiciously (avoid on non-transient errors)
Implement exit handlers for cleanup
Log detailed error context for debugging
Test individual agent nodes in isolation first
Use mock agents for integration testing
Test edge cases (empty arrays, null values)
Validate conditional branches and loops
Test cancellation and timeout scenarios
Workflow vs Orchestrator
Choose Workflows when you need:
Deterministic, repeatable execution
Complex control flow (loops, conditionals, maps)
Guaranteed execution order
Audit trails for compliance
Performance-critical operations
Choose Orchestrator when you need:
Dynamic routing based on content
LLM-driven decision making
Conversational adaptability
Exploratory or research tasks
See the Orchestrator comparison table for detailed differences.
Next Steps
Orchestrator Learn about dynamic LLM-driven orchestration
Agents Build agents for use in workflows
A2A Protocol Understand agent communication
Deployment Deploy and monitor workflows in production