Skip to main content

Overview

For long-running tasks, agents and workflows can publish streaming status updates to provide real-time progress feedback. This enables responsive UIs and better observability.

Streaming vs Non-Streaming

Non-Streaming (Default)

Client --> Agent: tasks/send
Agent --> Client: Task (submitted)
... (agent processing)
Agent --> Client: Task (completed)
Client receives only initial acknowledgment and final result.

Streaming

Client --> Agent: tasks/send-streaming
Agent --> Client: Task (submitted)
Agent --> Client: TaskStatusUpdateEvent (working) [progress: 10%]
Agent --> Client: TaskStatusUpdateEvent (working) [progress: 50%]
Agent --> Client: TaskStatusUpdateEvent (working) [progress: 90%]
Agent --> Client: Task (completed)
Client receives periodic status updates during execution.

Streaming Request

Method: tasks/send-streaming

{
  "jsonrpc": "2.0",
  "id": "streaming_req_123",
  "method": "tasks/send-streaming",
  "params": {
    "message": {
      "role": "user",
      "parts": [
        {"text": "Process this large dataset"},
        {"data": {"dataset_id": "DS-12345"}}
      ],
      "context_id": "session_789"
    }
  }
}
Topic: {namespace}/a2a/v1/agent/request/{agent_name} User Properties:
{
  "replyToTopic": "{namespace}/a2a/v1/gateway/response/{gateway_id}/{task_id}",
  "statusTopic": "{namespace}/a2a/v1/gateway/status/{gateway_id}/{task_id}"
}
The statusTopic is where status updates are sent.

Status Update Events

TaskStatusUpdateEvent

{
  "task_id": "task_abc123",
  "context_id": "session_789",
  "status": {
    "state": "working",
    "message": {
      "role": "agent",
      "parts": [
        {
          "text": "Processing records 1000-2000 of 10000"
        },
        {
          "data": {
            "progress": 0.2,
            "records_processed": 2000,
            "total_records": 10000
          }
        }
      ]
    },
    "timestamp": "2024-03-04T12:00:15Z"
  },
  "final": false,
  "kind": "status-update",
  "metadata": {
    "agent_name": "DataProcessor"
  }
}
task_id
string
required
Task identifier being updated.
context_id
string
required
Session/context ID.
status
object
required
Current task status with optional message.
final
boolean
required
  • false: Intermediate update
  • true: Final update (task complete/failed)
kind
string
required
Always "status-update" for status events.
metadata
object
Additional metadata (agent name, etc.).

Wrapped in JSON-RPC Response

Status updates are sent as JSON-RPC responses:
{
  "jsonrpc": "2.0",
  "id": "streaming_req_123",
  "result": {
    "task_id": "task_abc123",
    "context_id": "session_789",
    "status": {
      "state": "working",
      "message": {
        "role": "agent",
        "parts": [
          {"text": "Processing..."}
        ]
      }
    },
    "final": false,
    "kind": "status-update"
  }
}
Topic: Value from statusTopic user property

Workflow Streaming Events

Workflows publish structured events to track execution:

Workflow Execution Start

{
  "task_id": "wf_task_456",
  "status": {
    "state": "working",
    "message": {
      "role": "agent",
      "parts": [
        {
          "data": {
            "type": "workflow_execution_start",
            "workflow_name": "OrderProcessingWorkflow",
            "workflow_version": "2.1.0",
            "execution_id": "exec_abc123"
          }
        }
      ]
    }
  },
  "final": false,
  "kind": "status-update"
}
Source: src/solace_agent_mesh/common/data_parts.py

Node Execution Start

{
  "data": {
    "type": "workflow_node_execution_start",
    "node_id": "validate_order",
    "node_type": "agent",
    "agent_name": "OrderValidator",
    "sub_task_id": "wf_exec_abc123_validate_order_xyz789",
    "parallel_group_id": "implicit_parallel_xyz",
    "iteration_index": 0
  }
}
type
string
required
Always "workflow_node_execution_start".
node_id
string
required
Workflow node ID.
node_type
string
required
Node type: "agent", "switch", "map", "loop", "workflow".
agent_name
string
Agent/workflow being invoked (for agent/workflow nodes).
sub_task_id
string
Unique sub-task ID for this node execution.
parallel_group_id
string
Group ID for parallel/map nodes (used for visualization).
iteration_index
integer
Iteration index for map/loop nodes.

Node Execution Result

{
  "data": {
    "type": "workflow_node_execution_result",
    "node_id": "validate_order",
    "status": "success",
    "output_artifact_ref": {
      "name": "validation_result.json",
      "version": 1
    },
    "metadata": {
      "execution_time_ms": 1234
    }
  }
}
status
string
required
  • "success": Node completed successfully
  • "error": Node failed
  • "skipped": Node skipped (conditional execution)
output_artifact_ref
object
Reference to output artifact (when status is "success").
metadata
object
Additional execution metadata.

Map Progress Update

{
  "data": {
    "type": "workflow_map_progress",
    "node_id": "process_items",
    "total_items": 100,
    "completed_items": 47,
    "status": "in-progress"
  }
}

Switch Node Decision

{
  "data": {
    "type": "workflow_node_execution_result",
    "node_id": "approval_decision",
    "status": "success",
    "metadata": {
      "selected_branch": "manual_approval",
      "selected_case_index": 0
    }
  }
}

Workflow Execution Result

{
  "data": {
    "type": "workflow_execution_result",
    "status": "success",
    "workflow_output": {
      "order_id": "ORD-123",
      "status": "completed",
      "total": 125.50
    }
  }
}

Publishing Status Updates

Agent Publishing

from solace_agent_mesh.common import a2a
from a2a.types import TaskState

# Create status update event
status_update = a2a.create_status_update(
    task_id=task_id,
    context_id=session_id,
    message=a2a.create_agent_parts_message(
        parts=[
            a2a.create_text_part("Processing records..."),
            a2a.create_data_part({
                "progress": 0.5,
                "records_processed": 5000
            })
        ]
    ),
    is_final=False,
    state=TaskState.working,
    metadata={"agent_name": agent_name}
)

# Wrap in JSON-RPC response
rpc_response = a2a.create_success_response(
    result=status_update,
    request_id=request_id
)

# Publish to status topic
self.publish_a2a_message(
    payload=rpc_response.model_dump(exclude_none=True),
    topic=status_topic
)
Source: src/solace_agent_mesh/common/a2a/events.py:22-96

Workflow Publishing

# Create workflow event
event_data = WorkflowNodeExecutionStartData(
    type="workflow_node_execution_start",
    node_id="validate_order",
    node_type="agent",
    agent_name="OrderValidator",
    sub_task_id=sub_task_id
)

# Publish workflow event
await self.publish_workflow_event(workflow_context, event_data)
async def publish_workflow_event(
    self,
    workflow_context: WorkflowExecutionContext,
    event_data: Any,
):
    """Publish a workflow status event."""
    status_update_event = a2a.create_data_signal_event(
        task_id=workflow_context.a2a_context["logical_task_id"],
        context_id=workflow_context.a2a_context["session_id"],
        signal_data=event_data,
        agent_name=self.workflow_name,
    )
    
    rpc_response = a2a.create_success_response(
        result=status_update_event,
        request_id=workflow_context.a2a_context["jsonrpc_request_id"],
    )
    
    target_topic = workflow_context.a2a_context.get(
        "statusTopic"
    ) or a2a.get_gateway_status_topic(
        self.namespace,
        "gateway",
        workflow_context.a2a_context["logical_task_id"],
    )
    
    self.publish_a2a_message(
        payload=rpc_response.model_dump(exclude_none=True),
        topic=target_topic,
        user_properties={
            "a2aUserConfig": workflow_context.a2a_context.get(
                "a2a_user_config", {}
            )
        },
    )
Source: src/solace_agent_mesh/workflow/component.py:414-451

Consuming Status Updates

Gateway Subscribing

# Subscribe to status updates for specific task
status_topic = a2a.get_gateway_status_subscription_topic(
    namespace=namespace,
    self_gateway_id=gateway_id
)
# => "{namespace}/a2a/v1/gateway/status/{gateway_id}/>"

# Handler receives all status updates for this gateway's tasks
async def handle_status_update(message: SolaceMessage, topic: str):
    payload = message.get_payload()
    response = JSONRPCResponse.model_validate(payload)
    
    if isinstance(response.result, TaskStatusUpdateEvent):
        event = response.result
        
        # Extract progress info
        data_parts = a2a.get_data_parts_from_status_update(event)
        for part in data_parts:
            data = a2a.get_data_from_data_part(part)
            if "progress" in data:
                print(f"Progress: {data['progress'] * 100}%")
            
            # Handle workflow events
            if data.get("type") == "workflow_node_execution_start":
                print(f"Node started: {data['node_id']}")
Source: src/solace_agent_mesh/common/a2a/events.py:134-168

Client-Side Handling

Clients can display real-time progress:
// Subscribe to status topic
const statusTopic = `production/a2a/v1/gateway/status/gw_123/${taskId}`;

messaging.subscribe(statusTopic, (message) => {
  const payload = JSON.parse(message.getBinaryAttachment());
  const event = payload.result;
  
  if (!event.final) {
    // Intermediate update
    const parts = event.status.message.parts;
    
    // Extract progress data
    const dataPart = parts.find(p => p.data);
    if (dataPart?.data?.progress) {
      updateProgressBar(dataPart.data.progress);
    }
    
    // Extract text message
    const textPart = parts.find(p => p.text);
    if (textPart) {
      updateStatusText(textPart.text);
    }
  } else {
    // Final update - task complete
    handleTaskComplete(event);
  }
});

Artifact Updates

For streaming large artifacts (chunked delivery):

TaskArtifactUpdateEvent

{
  "task_id": "task_abc123",
  "context_id": "session_789",
  "artifact": {
    "id": "artifact_xyz",
    "name": "large_report.pdf",
    "mime_type": "application/pdf",
    "inline_data": {
      "data": "<base64-chunk>"
    }
  },
  "append": true,
  "last_chunk": false,
  "kind": "artifact-update",
  "metadata": {
    "chunk_index": 1,
    "total_chunks": 10
  }
}
artifact
object
required
Artifact object with chunk data.
append
boolean
default:"false"
If true, append this chunk to previous content.
last_chunk
boolean
default:"false"
If true, this is the final chunk.

Publishing Artifact Updates

artifact_update = a2a.create_artifact_update(
    task_id=task_id,
    context_id=session_id,
    artifact=Artifact(
        name="report.pdf",
        mime_type="application/pdf",
        inline_data=InlineData(
            data=base64.b64encode(chunk_bytes).decode()
        )
    ),
    append=True,
    last_chunk=(chunk_index == total_chunks - 1),
    metadata={
        "chunk_index": chunk_index,
        "total_chunks": total_chunks
    }
)
Source: src/solace_agent_mesh/common/a2a/events.py:98-129

Best Practices

Update Frequency

  • Too frequent: Network overhead, message congestion
  • Too infrequent: Poor UX, no perceived progress
  • Recommended: 1-5 seconds for long tasks, or at logical milestones
# Throttle updates
last_update = time.time()
update_interval = 2.0  # seconds

for i, item in enumerate(items):
    process_item(item)
    
    # Only update every 2 seconds
    if time.time() - last_update > update_interval:
        await publish_status_update(
            progress=i / len(items),
            message=f"Processed {i}/{len(items)} items"
        )
        last_update = time.time()

Meaningful Progress

Provide actionable information:
# Good: Specific and informative
await publish_status_update(
    message="Validating order items (2/5 complete)",
    data={
        "progress": 0.4,
        "current_step": "validation",
        "items_validated": 2,
        "total_items": 5
    }
)

# Bad: Vague and unhelpful
await publish_status_update(
    message="Processing..."
)

Error Streaming

Stream errors for better debugging:
try:
    result = await process_item(item)
except ValidationError as e:
    # Stream error as status update
    await publish_status_update(
        state=TaskState.working,  # Still working
        message=f"Warning: Item {item.id} validation failed: {e}",
        data={
            "error_type": "validation_warning",
            "item_id": item.id,
            "error_message": str(e)
        }
    )
    # Continue processing other items

Complete Example: Streaming Workflow

# Workflow publishes events throughout execution

# 1. Workflow start
await publish_workflow_event(
    WorkflowExecutionStartData(
        type="workflow_execution_start",
        workflow_name="DataProcessing",
        execution_id="exec_123"
    )
)

# 2. Node execution start
await publish_workflow_event(
    WorkflowNodeExecutionStartData(
        type="workflow_node_execution_start",
        node_id="validate_data",
        node_type="agent",
        agent_name="DataValidator"
    )
)

# 3. Node execution complete
await publish_workflow_event(
    WorkflowNodeExecutionResultData(
        type="workflow_node_execution_result",
        node_id="validate_data",
        status="success"
    )
)

# 4. Map progress updates
await publish_workflow_event(
    WorkflowMapProgressData(
        type="workflow_map_progress",
        node_id="process_records",
        total_items=1000,
        completed_items=250,
        status="in-progress"
    )
)

# 5. Workflow completion
await publish_workflow_event(
    WorkflowExecutionResultData(
        type="workflow_execution_result",
        status="success",
        workflow_output={"processed": 1000}
    )
)
Client receives all these events in real-time for visualization.

Next Steps

Task Invocation

Learn about task invocation patterns

Workflow Execution

Understand workflow execution lifecycle

Build docs developers (and LLMs) love