Skip to main content

Workflow Development

This guide teaches you how to develop workflows in Solace Agent Mesh that orchestrate multiple agents to accomplish complex, multi-step tasks.

Understanding Workflows

Workflows in Solace Agent Mesh:
  • Coordinate multiple agents to accomplish complex tasks
  • Support sequential, parallel, and conditional execution
  • Handle errors and retries automatically
  • Maintain state across multiple steps
  • Can be invoked by other agents like any other agent
  • Are themselves discoverable agents in the mesh

Workflow Concepts

Workflow Structure

workflow:
  version: "1.0.0"
  description: "What this workflow does"
  
  input_schema: { }      # Expected inputs
  output_schema: { }     # Expected outputs
  
  nodes: [ ]             # Workflow steps
  output_mapping: { }    # How to construct final output
  skills: [ ]            # Capabilities for discovery

Node Types

  • agent: Delegate a task to an agent
  • switch: Conditional branching
  • map: Iterate over arrays

Execution Flow

  • Nodes execute when their dependencies complete
  • Use depends_on to define execution order
  • Parallel execution happens automatically when possible

Creating Your First Workflow

1
Step 1: Define the Workflow Configuration
2
Create a YAML file for your workflow:
3
log:
  stdout_log_level: INFO
  log_file_level: DEBUG
  log_file: simple_workflow.log

!include shared_config.yaml

apps:
  - name: simple_workflow_app
    app_base_path: .
    app_module: solace_agent_mesh.workflow.app
    
    broker:
      <<: *broker_connection
    
    app_config:
      namespace: ${NAMESPACE}
      name: "SimpleWorkflow"
      display_name: "Simple Data Processing Workflow"
      
      # Timeout configurations
      max_workflow_execution_time_seconds: 300  # 5 minutes total
      default_node_timeout_seconds: 60          # 1 minute per node
      
      workflow:
        version: "1.0.0"
        description: |
          A simple workflow that processes data through multiple steps:
          1. Fetches data from a source
          2. Processes the data
          3. Generates a report
4
Step 2: Define Input and Output Schemas
5
Specify what the workflow expects and returns:
6
workflow:
  input_schema:
    type: object
    properties:
      data_source:
        type: string
        description: "URL or path to data source"
      report_format:
        type: string
        enum: ["pdf", "html", "excel"]
        description: "Desired report format"
    required: ["data_source"]
  
  output_schema:
    type: object
    properties:
      report_file:
        type: string
        description: "Name of the generated report"
      summary:
        type: string
        description: "Executive summary"
      record_count:
        type: integer
        description: "Number of records processed"
    required: ["report_file", "summary"]
7
Step 3: Define Workflow Nodes
8
Add nodes for each step:
9
workflow:
  nodes:
    # Step 1: Fetch data
    - id: fetch_data
      type: agent
      agent_name: "DataFetcherAgent"
      instruction: "Fetch data from the specified source and save as CSV."
      input:
        source: "{{workflow.input.data_source}}"
    
    # Step 2: Process data
    - id: process_data
      type: agent
      agent_name: "DataProcessorAgent"
      depends_on: [fetch_data]  # Wait for fetch to complete
      instruction: "Clean and process the data, remove duplicates, and validate."
      input:
        data_file: "{{fetch_data.output.filename}}"
    
    # Step 3: Generate report
    - id: generate_report
      type: agent
      agent_name: "ReportGeneratorAgent"
      depends_on: [process_data]  # Wait for processing
      instruction: "Generate a report in the requested format with visualizations."
      input:
        data_file: "{{process_data.output.filename}}"
        format: "{{workflow.input.report_format}}"
10
Step 4: Define Output Mapping
11
Map node outputs to workflow output:
12
workflow:
  output_mapping:
    report_file: "{{generate_report.output.report_filename}}"
    summary: "{{generate_report.output.summary}}"
    record_count: "{{process_data.output.total_records}}"
13
Step 5: Configure as Discoverable Agent
14
Make the workflow discoverable:
15
workflow:
  skills:
    - id: "data_processing_workflow"
      name: "Data Processing Workflow"
      description: "Fetch, process, and report on data from various sources"
      examples:
        - "Process the sales data and generate a report"
        - "Fetch customer data from the API and create an analysis"
      tags: ["data", "reporting", "etl"]

session_service:
  <<: *default_session_service

artifact_service:
  <<: *default_artifact_service

agent_card_publishing: { interval_seconds: 10 }
agent_discovery: { enabled: true }
16
Step 6: Run the Workflow
17
Start the workflow:
18
sam run configs/workflows/simple_workflow.yaml
19
Invoke via a gateway:
20
User: Process the sales data from https://example.com/sales.csv and generate a PDF report

Workflow Patterns

Sequential Processing

Steps execute one after another:
nodes:
  - id: step1
    type: agent
    agent_name: "Agent1"
    instruction: "Do step 1"
  
  - id: step2
    type: agent
    agent_name: "Agent2"
    depends_on: [step1]
    instruction: "Do step 2 using {{step1.output.result}}"
  
  - id: step3
    type: agent
    agent_name: "Agent3"
    depends_on: [step2]
    instruction: "Do step 3 using {{step2.output.result}}"

Parallel Execution

Independent steps run simultaneously:
nodes:
  # These run in parallel
  - id: fetch_customers
    type: agent
    agent_name: "DatabaseAgent"
    instruction: "Fetch all customer records"
  
  - id: fetch_orders
    type: agent
    agent_name: "DatabaseAgent"
    instruction: "Fetch all order records"
  
  - id: fetch_products
    type: agent
    agent_name: "DatabaseAgent"
    instruction: "Fetch all product records"
  
  # This waits for all three to complete
  - id: merge_data
    type: agent
    agent_name: "DataProcessorAgent"
    depends_on: [fetch_customers, fetch_orders, fetch_products]
    instruction: "Merge the three datasets"
    input:
      customers: "{{fetch_customers.output.filename}}"
      orders: "{{fetch_orders.output.filename}}"
      products: "{{fetch_products.output.filename}}"

Conditional Branching

Route execution based on conditions:
nodes:
  # Classify the request
  - id: classify
    type: agent
    agent_name: "ClassifierAgent"
    instruction: "Classify the request as simple, medium, or complex"
    input:
      request: "{{workflow.input.request}}"
  
  # Route based on classification
  - id: route
    type: switch
    depends_on: [classify]
    cases:
      - condition: "'{{classify.output.category}}' == 'simple'"
        node: handle_simple
      - condition: "'{{classify.output.category}}' == 'medium'"
        node: handle_medium
      - condition: "'{{classify.output.category}}' == 'complex'"
        node: handle_complex
  
  # Branch handlers
  - id: handle_simple
    type: agent
    agent_name: "SimpleHandlerAgent"
    depends_on: [route]
    instruction: "Handle simple request"
  
  - id: handle_medium
    type: agent
    agent_name: "MediumHandlerAgent"
    depends_on: [route]
    instruction: "Handle medium complexity request"
  
  - id: handle_complex
    type: agent
    agent_name: "ComplexHandlerAgent"
    depends_on: [route]
    instruction: "Handle complex request"
  
# Use coalesce to get output from whichever branch executed
output_mapping:
  result:
    coalesce:
      - "{{handle_simple.output.result}}"
      - "{{handle_medium.output.result}}"
      - "{{handle_complex.output.result}}"

Map Iteration

Process arrays in parallel:
nodes:
  # Process each item in the array
  - id: process_items
    type: map
    items: "{{workflow.input.items}}"  # Array input
    node: process_single_item          # Node to execute for each item
    concurrency_limit: 5               # Max parallel executions
  
  # Inner node processes one item
  - id: process_single_item
    type: agent
    agent_name: "ItemProcessorAgent"
    instruction: "Process this item"
    input:
      item_id: "{{_map_item.id}}"      # Current item
      value: "{{_map_item.value}}"
      index: "{{_map_index}}"          # Current index

# Results are automatically collected into array
output_mapping:
  results: "{{process_items.output.results}}"

Complete Example: Customer Onboarding Workflow

Here’s a complete workflow for customer onboarding:
customer_onboarding_workflow.yaml
log:
  stdout_log_level: INFO
  log_file_level: DEBUG
  log_file: customer_onboarding.log

!include shared_config.yaml

apps:
  - name: customer_onboarding_workflow
    app_base_path: .
    app_module: solace_agent_mesh.workflow.app
    
    broker:
      <<: *broker_connection
    
    app_config:
      namespace: ${NAMESPACE}
      name: "CustomerOnboardingWorkflow"
      display_name: "Customer Onboarding Workflow"
      
      max_workflow_execution_time_seconds: 600
      default_node_timeout_seconds: 120
      
      workflow:
        version: "1.0.0"
        description: |
          Automated customer onboarding workflow:
          1. Validates customer information
          2. Performs background checks in parallel
          3. Creates accounts based on validation results
          4. Sends welcome communications
        
        input_schema:
          type: object
          properties:
            customer_data:
              type: object
              properties:
                name: { type: string }
                email: { type: string }
                company: { type: string }
                phone: { type: string }
              required: [name, email]
            account_type:
              type: string
              enum: ["basic", "premium", "enterprise"]
          required: [customer_data, account_type]
        
        output_schema:
          type: object
          properties:
            customer_id: { type: string }
            account_status: { type: string }
            welcome_email_sent: { type: boolean }
            onboarding_complete: { type: boolean }
          required: [customer_id, account_status, onboarding_complete]
        
        nodes:
          # Step 1: Validate customer data
          - id: validate_customer
            type: agent
            agent_name: "ValidationAgent"
            instruction: |
              Validate the customer information:
              - Check email format
              - Verify phone number
              - Validate company information
              Return validation status and any issues found.
            input:
              customer_data: "{{workflow.input.customer_data}}"
          
          # Step 2: Parallel background checks
          - id: check_credit
            type: agent
            agent_name: "CreditCheckAgent"
            depends_on: [validate_customer]
            instruction: "Perform credit check for the customer"
            input:
              customer_name: "{{workflow.input.customer_data.name}}"
              company: "{{workflow.input.customer_data.company}}"
          
          - id: check_compliance
            type: agent
            agent_name: "ComplianceAgent"
            depends_on: [validate_customer]
            instruction: "Check compliance and regulatory requirements"
            input:
              customer_data: "{{workflow.input.customer_data}}"
          
          # Step 3: Decide account tier based on checks
          - id: determine_tier
            type: switch
            depends_on: [check_credit, check_compliance]
            cases:
              - condition: "'{{check_credit.output.score}}' > 700 and '{{check_compliance.output.status}}' == 'approved'"
                node: create_premium_account
              - condition: "'{{check_credit.output.score}}' > 600"
                node: create_standard_account
              - condition: "true"  # Default case
                node: create_basic_account
          
          # Account creation branches
          - id: create_premium_account
            type: agent
            agent_name: "AccountManagementAgent"
            depends_on: [determine_tier]
            instruction: "Create premium account with all features enabled"
            input:
              customer_data: "{{workflow.input.customer_data}}"
              tier: "premium"
          
          - id: create_standard_account
            type: agent
            agent_name: "AccountManagementAgent"
            depends_on: [determine_tier]
            instruction: "Create standard account with core features"
            input:
              customer_data: "{{workflow.input.customer_data}}"
              tier: "standard"
          
          - id: create_basic_account
            type: agent
            agent_name: "AccountManagementAgent"
            depends_on: [determine_tier]
            instruction: "Create basic account with limited features"
            input:
              customer_data: "{{workflow.input.customer_data}}"
              tier: "basic"
          
          # Step 4: Send welcome email
          - id: send_welcome
            type: agent
            agent_name: "EmailAgent"
            depends_on: [create_premium_account, create_standard_account, create_basic_account]
            instruction: "Send personalized welcome email to the customer"
            input:
              customer_email: "{{workflow.input.customer_data.email}}"
              customer_name: "{{workflow.input.customer_data.name}}"
              account_id:
                coalesce:
                  - "{{create_premium_account.output.account_id}}"
                  - "{{create_standard_account.output.account_id}}"
                  - "{{create_basic_account.output.account_id}}"
        
        output_mapping:
          customer_id:
            coalesce:
              - "{{create_premium_account.output.customer_id}}"
              - "{{create_standard_account.output.customer_id}}"
              - "{{create_basic_account.output.customer_id}}"
          account_status:
            coalesce:
              - "{{create_premium_account.output.status}}"
              - "{{create_standard_account.output.status}}"
              - "{{create_basic_account.output.status}}"
          welcome_email_sent: "{{send_welcome.output.sent}}"
          onboarding_complete: true
        
        skills:
          - id: "customer_onboarding"
            name: "Customer Onboarding"
            description: "Complete automated customer onboarding with validation, checks, and account creation"
            examples:
              - "Onboard new customer John Doe from Acme Corp"
              - "Set up enterprise account for new client"
            tags: ["onboarding", "customer", "automation"]
      
      session_service:
        <<: *default_session_service
      
      artifact_service:
        <<: *default_artifact_service
      
      agent_card_publishing: { interval_seconds: 10 }
      agent_discovery: { enabled: true }

Template Variables

Workflows support powerful template expressions:

Workflow Inputs

input:
  value: "{{workflow.input.parameter_name}}"

Node Outputs

input:
  previous_result: "{{previous_node.output.field_name}}"

Map Variables

input:
  item: "{{_map_item}}"         # Current item in map
  index: "{{_map_index}}"       # Current index (0-based)

Coalesce

Get first non-null value:
output_mapping:
  result:
    coalesce:
      - "{{branch_a.output.value}}"
      - "{{branch_b.output.value}}"
      - "default_value"

Error Handling

Workflows automatically handle errors:
workflow:
  nodes:
    - id: risky_operation
      type: agent
      agent_name: "ProcessorAgent"
      instruction: "Process the data"
      max_retries: 3              # Retry on failure
      retry_delay_seconds: 5       # Wait between retries

Exit Handlers

Run cleanup or notifications regardless of success/failure:
nodes:
  - id: main_process
    type: agent
    agent_name: "ProcessorAgent"
    instruction: "Main processing task"
  
  - id: cleanup_on_success
    type: agent
    agent_name: "CleanupAgent"
    depends_on: [main_process]
    run_on: success  # Only if main_process succeeds
    instruction: "Clean up temporary files"
  
  - id: notify_on_failure
    type: agent
    agent_name: "NotificationAgent"
    depends_on: [main_process]
    run_on: failure  # Only if main_process fails
    instruction: "Send failure notification"
  
  - id: always_log
    type: agent
    agent_name: "LoggingAgent"
    depends_on: [main_process]
    run_on: always  # Runs regardless of success/failure
    instruction: "Log workflow execution"

Best Practices

Workflow Development Tips:
  1. Clear Names: Use descriptive node IDs that explain what each step does
  2. Schema Definition: Always define input/output schemas for validation
  3. Error Handling: Plan for failures with retries and fallback branches
  4. Timeouts: Set appropriate timeouts for long-running operations
  5. Testing: Test workflows with various inputs including edge cases
  6. Logging: Enable detailed logging during development
  7. Dependencies: Only declare necessary dependencies to maximize parallelism
  8. Modularity: Break complex workflows into smaller, reusable workflows

Testing Workflows

1
Test Individual Nodes
2
Test each agent independently before integration:
3
sam agent invoke ProcessorAgent --message "Test input"
4
Test Full Workflow
5
Run the complete workflow:
6
sam run configs/workflows/my_workflow.yaml
7
Then invoke via gateway or API:
8
curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "agent_name": "MyWorkflow",
    "message": "Process the test data"
  }'
9
Monitor Execution
10
Watch workflow progress in logs:
11
tail -f my_workflow.log

Troubleshooting

Common Issues:
  1. Node not executing: Check depends_on dependencies
  2. Template errors: Verify variable names match node IDs and output fields
  3. Timeout errors: Increase max_workflow_execution_time_seconds
  4. Missing outputs: Ensure nodes produce expected output fields
  5. Circular dependencies: Check for dependency loops

Debug Logging

log:
  stdout_log_level: DEBUG
  log_file_level: DEBUG
  log_file: workflow_debug.log

Validate Configuration

sam validate configs/workflows/my_workflow.yaml

Next Steps

Real-World Examples

Explore workflow examples in the source:
  • examples/workflows/level1_simple_sequential.yaml - Basic sequential workflow
  • examples/workflows/level2_parallel_execution.yaml - Parallel processing
  • examples/workflows/level3_conditional_branching.yaml - Conditional logic
  • examples/workflows/level4_map_iteration.yaml - Array processing
  • examples/workflows/level7_error_handling.yaml - Error handling patterns

Build docs developers (and LLMs) love