What you’ll build: Advanced workflows with branching, iteration, and error handlingTime: ~30 minutesPrerequisites:
- Completed Simple Workflow tutorial
- Understanding of basic workflow concepts
What you’ll learn
- Conditional branching with switch nodes
- Iterating over datasets with map nodes
- Error handling and recovery strategies
- Workflow composition (workflows calling workflows)
- Polling and retry patterns
- Exit handlers and cleanup operations
Advanced workflow patterns
Pattern 1: Conditional routing
workflow:
nodes:
# Classify the input
- id: classify_input
type: agent
agent_name: "ClassifierAgent"
input:
data: "{{workflow.input.data}}"
# Route based on classification
- id: route_by_type
type: switch
depends_on: [classify_input]
cases:
- condition: "'{{classify_input.output.type}}' == 'urgent'"
node: handle_urgent
- condition: "'{{classify_input.output.type}}' == 'normal'"
node: handle_normal
- default: handle_unknown
- id: handle_urgent
type: agent
agent_name: "UrgentHandler"
depends_on: [route_by_type]
- id: handle_normal
type: agent
agent_name: "NormalHandler"
depends_on: [route_by_type]
- id: handle_unknown
type: agent
agent_name: "DefaultHandler"
depends_on: [route_by_type]
# Use coalesce to get output from executed path
output_mapping:
result:
coalesce:
- "{{handle_urgent.output.result}}"
- "{{handle_normal.output.result}}"
- "{{handle_unknown.output.result}}"
Pattern 2: Map iteration
workflow:
input_schema:
type: object
properties:
items:
type: array
items:
type: object
nodes:
# Process each item in parallel
- id: process_items
type: map
items: "{{workflow.input.items}}"
node:
id: process_single_item
type: agent
agent_name: "ItemProcessor"
input:
item: "{{_map_item}}"
index: "{{_map_index}}"
# Aggregate results
- id: summarize
type: agent
agent_name: "SummarizerAgent"
depends_on: [process_items]
input:
results: "{{process_items.output}}"
output_mapping:
processed_items: "{{process_items.output}}"
summary: "{{summarize.output}}"
{{_map_item}}contains the current item{{_map_index}}contains the zero-based index- All map iterations run in parallel
Pattern 3: Error handling
workflow:
nodes:
- id: risky_operation
type: agent
agent_name: "RiskyAgent"
# Retry configuration
retry_policy:
max_attempts: 3
retry_delay_seconds: 5
backoff_multiplier: 2 # 5s, 10s, 20s
# Continue on error instead of failing workflow
on_error: continue
- id: verify_success
type: agent
agent_name: "VerifierAgent"
depends_on: [risky_operation]
input:
status: "{{risky_operation.status}}"
# Exit handlers run regardless of success/failure
- id: cleanup
type: agent
agent_name: "CleanupAgent"
exit_handler:
always: true # Runs on success OR failure
always: true- Always runson_success: true- Only on workflow successon_failure: true- Only on workflow failure
Pattern 4: Nested workflows
workflow:
nodes:
- id: data_preparation
type: workflow
workflow_name: "DataPrepWorkflow"
input:
raw_data: "{{workflow.input.data}}"
- id: analysis
type: workflow
workflow_name: "AnalysisWorkflow"
depends_on: [data_preparation]
input:
prepared_data: "{{data_preparation.output.data}}"
- id: reporting
type: workflow
workflow_name: "ReportingWorkflow"
depends_on: [analysis]
input:
analysis_results: "{{analysis.output}}"
Pattern 5: Polling and waiting
workflow:
nodes:
- id: start_job
type: agent
agent_name: "JobStarterAgent"
input:
job_config: "{{workflow.input.config}}"
# Poll until job completes
- id: wait_for_completion
type: loop
depends_on: [start_job]
max_iterations: 60 # Max 60 checks
condition: "'{{check_status.output.status}}' != 'completed'"
nodes:
- id: check_status
type: agent
agent_name: "StatusCheckerAgent"
input:
job_id: "{{start_job.output.job_id}}"
- id: wait
type: delay
seconds: 10 # Wait 10s between checks
- id: get_results
type: agent
agent_name: "ResultFetcherAgent"
depends_on: [wait_for_completion]
input:
job_id: "{{start_job.output.job_id}}"
Complete example: Bug triage workflow
bug_triage_workflow.yaml
log:
stdout_log_level: INFO
log_file_level: DEBUG
log_file: bug_triage.log
!include shared_config.yaml
apps:
# Workflow: Automated bug triage
- name: bug_triage_workflow
app_base_path: .
app_module: solace_agent_mesh.workflow.app
broker:
<<: *broker_connection
app_config:
namespace: ${NAMESPACE}
name: "BugTriageWorkflow"
display_name: "Automated Bug Triage"
max_workflow_execution_time_seconds: 600
default_node_timeout_seconds: 120
workflow:
version: "1.0.0"
description: |
Automated bug triage workflow:
1. Extract bug details from report
2. Check for duplicates
3. Classify severity
4. Route to appropriate team
5. Create/update ticket
input_schema:
type: object
properties:
bug_report:
type: string
description: "Bug description from user"
reporter_email:
type: string
required: [bug_report]
output_schema:
type: object
properties:
ticket_id: {type: string}
severity: {type: string}
assigned_team: {type: string}
status: {type: string}
nodes:
# Step 1: Parse bug report
- id: parse_report
type: agent
agent_name: "BugParserAgent"
instruction: |
Extract structured information:
- Title
- Description
- Steps to reproduce
- Expected vs actual behavior
input:
report: "{{workflow.input.bug_report}}"
# Step 2: Search for duplicates
- id: check_duplicates
type: agent
agent_name: "DuplicateCheckerAgent"
depends_on: [parse_report]
input:
title: "{{parse_report.output.title}}"
description: "{{parse_report.output.description}}"
# Step 3: Branch based on duplicate status
- id: handle_duplicate_or_new
type: switch
depends_on: [check_duplicates]
cases:
- condition: "'{{check_duplicates.output.is_duplicate}}' == 'true'"
node: update_existing_ticket
- default: classify_new_bug
# Path A: Update existing ticket
- id: update_existing_ticket
type: agent
agent_name: "TicketUpdaterAgent"
depends_on: [handle_duplicate_or_new]
input:
ticket_id: "{{check_duplicates.output.duplicate_id}}"
additional_info: "{{parse_report.output}}"
# Path B: Classify new bug
- id: classify_new_bug
type: agent
agent_name: "SeverityClassifierAgent"
depends_on: [handle_duplicate_or_new]
input:
bug_details: "{{parse_report.output}}"
# Route based on severity
- id: route_by_severity
type: switch
depends_on: [classify_new_bug]
cases:
- condition: "'{{classify_new_bug.output.severity}}' == 'critical'"
node: escalate_critical
- condition: "'{{classify_new_bug.output.severity}}' == 'high'"
node: assign_to_senior_dev
- default: assign_to_team
# Critical path
- id: escalate_critical
type: agent
agent_name: "EscalationAgent"
depends_on: [route_by_severity]
input:
bug: "{{parse_report.output}}"
severity: "critical"
# High severity path
- id: assign_to_senior_dev
type: agent
agent_name: "TeamAssignmentAgent"
depends_on: [route_by_severity]
input:
bug: "{{parse_report.output}}"
team: "senior_developers"
# Normal path
- id: assign_to_team
type: agent
agent_name: "TeamAssignmentAgent"
depends_on: [route_by_severity]
input:
bug: "{{parse_report.output}}"
team: "general_queue"
# Create ticket (for new bugs only)
- id: create_ticket
type: agent
agent_name: "TicketCreatorAgent"
depends_on: [escalate_critical, assign_to_senior_dev, assign_to_team]
input:
bug: "{{parse_report.output}}"
assignment:
coalesce:
- "{{escalate_critical.output}}"
- "{{assign_to_senior_dev.output}}"
- "{{assign_to_team.output}}"
# Notify reporter (always runs)
- id: notify_reporter
type: agent
agent_name: "NotificationAgent"
exit_handler:
always: true
input:
email: "{{workflow.input.reporter_email}}"
ticket_info:
coalesce:
- "{{create_ticket.output}}"
- "{{update_existing_ticket.output}}"
output_mapping:
ticket_id:
coalesce:
- "{{create_ticket.output.ticket_id}}"
- "{{update_existing_ticket.output.ticket_id}}"
severity:
coalesce:
- "{{classify_new_bug.output.severity}}"
- "existing"
assigned_team:
coalesce:
- "{{escalate_critical.output.team}}"
- "{{assign_to_senior_dev.output.team}}"
- "{{assign_to_team.output.team}}"
- "duplicate_handling"
status: "processed"
session_service:
<<: *default_session_service
artifact_service:
<<: *default_artifact_service
agent_card_publishing: { interval_seconds: 10 }
agent_discovery: { enabled: true }
Best practices
Keep workflows focused
Keep workflows focused
- Each workflow should have a single clear purpose
- Break complex processes into multiple workflows
- Use workflow composition for reusability
Handle all edge cases
Handle all edge cases
nodes:
- id: operation
type: agent
on_error: continue
retry_policy:
max_attempts: 3
# Always check for errors
- id: verify
type: agent
input:
success: "{{operation.status == 'success'}}"
Use meaningful node IDs
Use meaningful node IDs
# Good
- id: fetch_user_data
- id: validate_payment
- id: send_confirmation_email
# Bad
- id: node1
- id: step_2
- id: agent_call
Set appropriate timeouts
Set appropriate timeouts
# Workflow-level timeout
max_workflow_execution_time_seconds: 600
# Default for all nodes
default_node_timeout_seconds: 60
# Override for specific nodes
nodes:
- id: long_running_task
timeout_seconds: 300
Testing complex workflows
test_workflow.py
import pytest
import asyncio
from solace_agent_mesh.client import SAMClient
@pytest.mark.asyncio
async def test_bug_triage_new_critical():
"""Test workflow with new critical bug."""
client = SAMClient(
broker_url="ws://localhost:8008",
namespace="test"
)
await client.connect()
try:
result = await client.send_task(
agent_name="BugTriageWorkflow",
task_input={
"bug_report": "CRITICAL: Database crashes on startup",
"reporter_email": "[email protected]"
}
)
assert result["severity"] == "critical"
assert "ticket_id" in result
assert result["assigned_team"] == "escalation"
finally:
await client.disconnect()
@pytest.mark.asyncio
async def test_bug_triage_duplicate():
"""Test workflow with duplicate bug."""
# ... similar test for duplicate path
Next steps
RAG Implementation
Add retrieval-augmented generation
Production Deployment
Deploy workflows to production
Workflow Reference
Complete workflow documentation
Performance Optimization
Optimize workflow performance
Key concepts learned
- Conditional branching with switch nodes
- Map iteration over datasets
- Error handling and retry strategies
- Nested workflows and composition
- Exit handlers and cleanup
- Testing complex workflows