Skip to main content
What you’ll build: Advanced workflows with branching, iteration, and error handlingTime: ~30 minutesPrerequisites:

What you’ll learn

  • Conditional branching with switch nodes
  • Iterating over datasets with map nodes
  • Error handling and recovery strategies
  • Workflow composition (workflows calling workflows)
  • Polling and retry patterns
  • Exit handlers and cleanup operations

Advanced workflow patterns

Pattern 1: Conditional routing

workflow:
  nodes:
    # Classify the input
    - id: classify_input
      type: agent
      agent_name: "ClassifierAgent"
      input:
        data: "{{workflow.input.data}}"
    
    # Route based on classification
    - id: route_by_type
      type: switch
      depends_on: [classify_input]
      cases:
        - condition: "'{{classify_input.output.type}}' == 'urgent'"
          node: handle_urgent
        - condition: "'{{classify_input.output.type}}' == 'normal'"
          node: handle_normal
        - default: handle_unknown
    
    - id: handle_urgent
      type: agent
      agent_name: "UrgentHandler"
      depends_on: [route_by_type]
    
    - id: handle_normal
      type: agent
      agent_name: "NormalHandler"
      depends_on: [route_by_type]
    
    - id: handle_unknown
      type: agent
      agent_name: "DefaultHandler"
      depends_on: [route_by_type]
  
  # Use coalesce to get output from executed path
  output_mapping:
    result:
      coalesce:
        - "{{handle_urgent.output.result}}"
        - "{{handle_normal.output.result}}"
        - "{{handle_unknown.output.result}}"

Pattern 2: Map iteration

workflow:
  input_schema:
    type: object
    properties:
      items:
        type: array
        items:
          type: object
  
  nodes:
    # Process each item in parallel
    - id: process_items
      type: map
      items: "{{workflow.input.items}}"
      node:
        id: process_single_item
        type: agent
        agent_name: "ItemProcessor"
        input:
          item: "{{_map_item}}"
          index: "{{_map_index}}"
    
    # Aggregate results
    - id: summarize
      type: agent
      agent_name: "SummarizerAgent"
      depends_on: [process_items]
      input:
        results: "{{process_items.output}}"
  
  output_mapping:
    processed_items: "{{process_items.output}}"
    summary: "{{summarize.output}}"
  • {{_map_item}} contains the current item
  • {{_map_index}} contains the zero-based index
  • All map iterations run in parallel

Pattern 3: Error handling

workflow:
  nodes:
    - id: risky_operation
      type: agent
      agent_name: "RiskyAgent"
      # Retry configuration
      retry_policy:
        max_attempts: 3
        retry_delay_seconds: 5
        backoff_multiplier: 2  # 5s, 10s, 20s
      # Continue on error instead of failing workflow
      on_error: continue
    
    - id: verify_success
      type: agent
      agent_name: "VerifierAgent"
      depends_on: [risky_operation]
      input:
        status: "{{risky_operation.status}}"
    
    # Exit handlers run regardless of success/failure
    - id: cleanup
      type: agent
      agent_name: "CleanupAgent"
      exit_handler:
        always: true  # Runs on success OR failure
Exit handler types:
  • always: true - Always runs
  • on_success: true - Only on workflow success
  • on_failure: true - Only on workflow failure

Pattern 4: Nested workflows

workflow:
  nodes:
    - id: data_preparation
      type: workflow
      workflow_name: "DataPrepWorkflow"
      input:
        raw_data: "{{workflow.input.data}}"
    
    - id: analysis
      type: workflow
      workflow_name: "AnalysisWorkflow"
      depends_on: [data_preparation]
      input:
        prepared_data: "{{data_preparation.output.data}}"
    
    - id: reporting
      type: workflow
      workflow_name: "ReportingWorkflow"
      depends_on: [analysis]
      input:
        analysis_results: "{{analysis.output}}"

Pattern 5: Polling and waiting

workflow:
  nodes:
    - id: start_job
      type: agent
      agent_name: "JobStarterAgent"
      input:
        job_config: "{{workflow.input.config}}"
    
    # Poll until job completes
    - id: wait_for_completion
      type: loop
      depends_on: [start_job]
      max_iterations: 60  # Max 60 checks
      condition: "'{{check_status.output.status}}' != 'completed'"
      nodes:
        - id: check_status
          type: agent
          agent_name: "StatusCheckerAgent"
          input:
            job_id: "{{start_job.output.job_id}}"
        
        - id: wait
          type: delay
          seconds: 10  # Wait 10s between checks
    
    - id: get_results
      type: agent
      agent_name: "ResultFetcherAgent"
      depends_on: [wait_for_completion]
      input:
        job_id: "{{start_job.output.job_id}}"

Complete example: Bug triage workflow

bug_triage_workflow.yaml
log:
  stdout_log_level: INFO
  log_file_level: DEBUG
  log_file: bug_triage.log

!include shared_config.yaml

apps:
  # Workflow: Automated bug triage
  - name: bug_triage_workflow
    app_base_path: .
    app_module: solace_agent_mesh.workflow.app
    broker:
      <<: *broker_connection

    app_config:
      namespace: ${NAMESPACE}
      name: "BugTriageWorkflow"
      display_name: "Automated Bug Triage"

      max_workflow_execution_time_seconds: 600
      default_node_timeout_seconds: 120

      workflow:
        version: "1.0.0"
        description: |
          Automated bug triage workflow:
          1. Extract bug details from report
          2. Check for duplicates
          3. Classify severity
          4. Route to appropriate team
          5. Create/update ticket

        input_schema:
          type: object
          properties:
            bug_report:
              type: string
              description: "Bug description from user"
            reporter_email:
              type: string
          required: [bug_report]

        output_schema:
          type: object
          properties:
            ticket_id: {type: string}
            severity: {type: string}
            assigned_team: {type: string}
            status: {type: string}

        nodes:
          # Step 1: Parse bug report
          - id: parse_report
            type: agent
            agent_name: "BugParserAgent"
            instruction: |
              Extract structured information:
              - Title
              - Description
              - Steps to reproduce
              - Expected vs actual behavior
            input:
              report: "{{workflow.input.bug_report}}"

          # Step 2: Search for duplicates
          - id: check_duplicates
            type: agent
            agent_name: "DuplicateCheckerAgent"
            depends_on: [parse_report]
            input:
              title: "{{parse_report.output.title}}"
              description: "{{parse_report.output.description}}"

          # Step 3: Branch based on duplicate status
          - id: handle_duplicate_or_new
            type: switch
            depends_on: [check_duplicates]
            cases:
              - condition: "'{{check_duplicates.output.is_duplicate}}' == 'true'"
                node: update_existing_ticket
              - default: classify_new_bug

          # Path A: Update existing ticket
          - id: update_existing_ticket
            type: agent
            agent_name: "TicketUpdaterAgent"
            depends_on: [handle_duplicate_or_new]
            input:
              ticket_id: "{{check_duplicates.output.duplicate_id}}"
              additional_info: "{{parse_report.output}}"

          # Path B: Classify new bug
          - id: classify_new_bug
            type: agent
            agent_name: "SeverityClassifierAgent"
            depends_on: [handle_duplicate_or_new]
            input:
              bug_details: "{{parse_report.output}}"

          # Route based on severity
          - id: route_by_severity
            type: switch
            depends_on: [classify_new_bug]
            cases:
              - condition: "'{{classify_new_bug.output.severity}}' == 'critical'"
                node: escalate_critical
              - condition: "'{{classify_new_bug.output.severity}}' == 'high'"
                node: assign_to_senior_dev
              - default: assign_to_team

          # Critical path
          - id: escalate_critical
            type: agent
            agent_name: "EscalationAgent"
            depends_on: [route_by_severity]
            input:
              bug: "{{parse_report.output}}"
              severity: "critical"

          # High severity path
          - id: assign_to_senior_dev
            type: agent
            agent_name: "TeamAssignmentAgent"
            depends_on: [route_by_severity]
            input:
              bug: "{{parse_report.output}}"
              team: "senior_developers"

          # Normal path
          - id: assign_to_team
            type: agent
            agent_name: "TeamAssignmentAgent"
            depends_on: [route_by_severity]
            input:
              bug: "{{parse_report.output}}"
              team: "general_queue"

          # Create ticket (for new bugs only)
          - id: create_ticket
            type: agent
            agent_name: "TicketCreatorAgent"
            depends_on: [escalate_critical, assign_to_senior_dev, assign_to_team]
            input:
              bug: "{{parse_report.output}}"
              assignment:
                coalesce:
                  - "{{escalate_critical.output}}"
                  - "{{assign_to_senior_dev.output}}"
                  - "{{assign_to_team.output}}"

          # Notify reporter (always runs)
          - id: notify_reporter
            type: agent
            agent_name: "NotificationAgent"
            exit_handler:
              always: true
            input:
              email: "{{workflow.input.reporter_email}}"
              ticket_info:
                coalesce:
                  - "{{create_ticket.output}}"
                  - "{{update_existing_ticket.output}}"

        output_mapping:
          ticket_id:
            coalesce:
              - "{{create_ticket.output.ticket_id}}"
              - "{{update_existing_ticket.output.ticket_id}}"
          severity:
            coalesce:
              - "{{classify_new_bug.output.severity}}"
              - "existing"
          assigned_team:
            coalesce:
              - "{{escalate_critical.output.team}}"
              - "{{assign_to_senior_dev.output.team}}"
              - "{{assign_to_team.output.team}}"
              - "duplicate_handling"
          status: "processed"

      session_service:
        <<: *default_session_service
      artifact_service:
        <<: *default_artifact_service

      agent_card_publishing: { interval_seconds: 10 }
      agent_discovery: { enabled: true }

Best practices

  • Each workflow should have a single clear purpose
  • Break complex processes into multiple workflows
  • Use workflow composition for reusability
nodes:
  - id: operation
    type: agent
    on_error: continue
    retry_policy:
      max_attempts: 3
  
  # Always check for errors
  - id: verify
    type: agent
    input:
      success: "{{operation.status == 'success'}}"
# Good
- id: fetch_user_data
- id: validate_payment
- id: send_confirmation_email

# Bad
- id: node1
- id: step_2
- id: agent_call
# Workflow-level timeout
max_workflow_execution_time_seconds: 600

# Default for all nodes
default_node_timeout_seconds: 60

# Override for specific nodes
nodes:
  - id: long_running_task
    timeout_seconds: 300

Testing complex workflows

test_workflow.py
import pytest
import asyncio
from solace_agent_mesh.client import SAMClient

@pytest.mark.asyncio
async def test_bug_triage_new_critical():
    """Test workflow with new critical bug."""
    client = SAMClient(
        broker_url="ws://localhost:8008",
        namespace="test"
    )
    await client.connect()
    
    try:
        result = await client.send_task(
            agent_name="BugTriageWorkflow",
            task_input={
                "bug_report": "CRITICAL: Database crashes on startup",
                "reporter_email": "[email protected]"
            }
        )
        
        assert result["severity"] == "critical"
        assert "ticket_id" in result
        assert result["assigned_team"] == "escalation"
    
    finally:
        await client.disconnect()

@pytest.mark.asyncio
async def test_bug_triage_duplicate():
    """Test workflow with duplicate bug."""
    # ... similar test for duplicate path

Next steps

RAG Implementation

Add retrieval-augmented generation

Production Deployment

Deploy workflows to production

Workflow Reference

Complete workflow documentation

Performance Optimization

Optimize workflow performance

Key concepts learned

  • Conditional branching with switch nodes
  • Map iteration over datasets
  • Error handling and retry strategies
  • Nested workflows and composition
  • Exit handlers and cleanup
  • Testing complex workflows

Build docs developers (and LLMs) love