Overview
For long-running tasks, agents and workflows can publish streaming status updates to provide real-time progress feedback. This enables responsive UIs and better observability.
Streaming vs Non-Streaming
Non-Streaming (Default)
Client --> Agent: tasks/send
Agent --> Client: Task (submitted)
... (agent processing)
Agent --> Client: Task (completed)
Client receives only initial acknowledgment and final result.
Streaming
Client --> Agent: tasks/send-streaming
Agent --> Client: Task (submitted)
Agent --> Client: TaskStatusUpdateEvent (working) [progress: 10%]
Agent --> Client: TaskStatusUpdateEvent (working) [progress: 50%]
Agent --> Client: TaskStatusUpdateEvent (working) [progress: 90%]
Agent --> Client: Task (completed)
Client receives periodic status updates during execution.
Streaming Request
Method: tasks/send-streaming
{
"jsonrpc" : "2.0" ,
"id" : "streaming_req_123" ,
"method" : "tasks/send-streaming" ,
"params" : {
"message" : {
"role" : "user" ,
"parts" : [
{ "text" : "Process this large dataset" },
{ "data" : { "dataset_id" : "DS-12345" }}
],
"context_id" : "session_789"
}
}
}
Topic: {namespace}/a2a/v1/agent/request/{agent_name}
User Properties:
{
"replyToTopic" : "{namespace}/a2a/v1/gateway/response/{gateway_id}/{task_id}" ,
"statusTopic" : "{namespace}/a2a/v1/gateway/status/{gateway_id}/{task_id}"
}
The statusTopic is where status updates are sent.
Status Update Events
TaskStatusUpdateEvent
{
"task_id" : "task_abc123" ,
"context_id" : "session_789" ,
"status" : {
"state" : "working" ,
"message" : {
"role" : "agent" ,
"parts" : [
{
"text" : "Processing records 1000-2000 of 10000"
},
{
"data" : {
"progress" : 0.2 ,
"records_processed" : 2000 ,
"total_records" : 10000
}
}
]
},
"timestamp" : "2024-03-04T12:00:15Z"
},
"final" : false ,
"kind" : "status-update" ,
"metadata" : {
"agent_name" : "DataProcessor"
}
}
Task identifier being updated.
Current task status with optional message.
false: Intermediate update
true: Final update (task complete/failed)
Always "status-update" for status events.
Additional metadata (agent name, etc.).
Wrapped in JSON-RPC Response
Status updates are sent as JSON-RPC responses:
{
"jsonrpc" : "2.0" ,
"id" : "streaming_req_123" ,
"result" : {
"task_id" : "task_abc123" ,
"context_id" : "session_789" ,
"status" : {
"state" : "working" ,
"message" : {
"role" : "agent" ,
"parts" : [
{ "text" : "Processing..." }
]
}
},
"final" : false ,
"kind" : "status-update"
}
}
Topic: Value from statusTopic user property
Workflow Streaming Events
Workflows publish structured events to track execution:
Workflow Execution Start
{
"task_id" : "wf_task_456" ,
"status" : {
"state" : "working" ,
"message" : {
"role" : "agent" ,
"parts" : [
{
"data" : {
"type" : "workflow_execution_start" ,
"workflow_name" : "OrderProcessingWorkflow" ,
"workflow_version" : "2.1.0" ,
"execution_id" : "exec_abc123"
}
}
]
}
},
"final" : false ,
"kind" : "status-update"
}
Source: src/solace_agent_mesh/common/data_parts.py
Node Execution Start
{
"data" : {
"type" : "workflow_node_execution_start" ,
"node_id" : "validate_order" ,
"node_type" : "agent" ,
"agent_name" : "OrderValidator" ,
"sub_task_id" : "wf_exec_abc123_validate_order_xyz789" ,
"parallel_group_id" : "implicit_parallel_xyz" ,
"iteration_index" : 0
}
}
Always "workflow_node_execution_start".
Node type: "agent", "switch", "map", "loop", "workflow".
Agent/workflow being invoked (for agent/workflow nodes).
Unique sub-task ID for this node execution.
Group ID for parallel/map nodes (used for visualization).
Iteration index for map/loop nodes.
Node Execution Result
{
"data" : {
"type" : "workflow_node_execution_result" ,
"node_id" : "validate_order" ,
"status" : "success" ,
"output_artifact_ref" : {
"name" : "validation_result.json" ,
"version" : 1
},
"metadata" : {
"execution_time_ms" : 1234
}
}
}
"success": Node completed successfully
"error": Node failed
"skipped": Node skipped (conditional execution)
Reference to output artifact (when status is "success").
Additional execution metadata.
Map Progress Update
{
"data" : {
"type" : "workflow_map_progress" ,
"node_id" : "process_items" ,
"total_items" : 100 ,
"completed_items" : 47 ,
"status" : "in-progress"
}
}
Switch Node Decision
{
"data" : {
"type" : "workflow_node_execution_result" ,
"node_id" : "approval_decision" ,
"status" : "success" ,
"metadata" : {
"selected_branch" : "manual_approval" ,
"selected_case_index" : 0
}
}
}
Workflow Execution Result
{
"data" : {
"type" : "workflow_execution_result" ,
"status" : "success" ,
"workflow_output" : {
"order_id" : "ORD-123" ,
"status" : "completed" ,
"total" : 125.50
}
}
}
Publishing Status Updates
Agent Publishing
from solace_agent_mesh.common import a2a
from a2a.types import TaskState
# Create status update event
status_update = a2a.create_status_update(
task_id = task_id,
context_id = session_id,
message = a2a.create_agent_parts_message(
parts = [
a2a.create_text_part( "Processing records..." ),
a2a.create_data_part({
"progress" : 0.5 ,
"records_processed" : 5000
})
]
),
is_final = False ,
state = TaskState.working,
metadata = { "agent_name" : agent_name}
)
# Wrap in JSON-RPC response
rpc_response = a2a.create_success_response(
result = status_update,
request_id = request_id
)
# Publish to status topic
self .publish_a2a_message(
payload = rpc_response.model_dump( exclude_none = True ),
topic = status_topic
)
Source: src/solace_agent_mesh/common/a2a/events.py:22-96
Workflow Publishing
# Create workflow event
event_data = WorkflowNodeExecutionStartData(
type = "workflow_node_execution_start" ,
node_id = "validate_order" ,
node_type = "agent" ,
agent_name = "OrderValidator" ,
sub_task_id = sub_task_id
)
# Publish workflow event
await self .publish_workflow_event(workflow_context, event_data)
async def publish_workflow_event (
self ,
workflow_context : WorkflowExecutionContext,
event_data : Any,
):
"""Publish a workflow status event."""
status_update_event = a2a.create_data_signal_event(
task_id = workflow_context.a2a_context[ "logical_task_id" ],
context_id = workflow_context.a2a_context[ "session_id" ],
signal_data = event_data,
agent_name = self .workflow_name,
)
rpc_response = a2a.create_success_response(
result = status_update_event,
request_id = workflow_context.a2a_context[ "jsonrpc_request_id" ],
)
target_topic = workflow_context.a2a_context.get(
"statusTopic"
) or a2a.get_gateway_status_topic(
self .namespace,
"gateway" ,
workflow_context.a2a_context[ "logical_task_id" ],
)
self .publish_a2a_message(
payload = rpc_response.model_dump( exclude_none = True ),
topic = target_topic,
user_properties = {
"a2aUserConfig" : workflow_context.a2a_context.get(
"a2a_user_config" , {}
)
},
)
Source: src/solace_agent_mesh/workflow/component.py:414-451
Consuming Status Updates
Gateway Subscribing
# Subscribe to status updates for specific task
status_topic = a2a.get_gateway_status_subscription_topic(
namespace = namespace,
self_gateway_id = gateway_id
)
# => "{namespace}/a2a/v1/gateway/status/{gateway_id}/>"
# Handler receives all status updates for this gateway's tasks
async def handle_status_update ( message : SolaceMessage, topic : str ):
payload = message.get_payload()
response = JSONRPCResponse.model_validate(payload)
if isinstance (response.result, TaskStatusUpdateEvent):
event = response.result
# Extract progress info
data_parts = a2a.get_data_parts_from_status_update(event)
for part in data_parts:
data = a2a.get_data_from_data_part(part)
if "progress" in data:
print ( f "Progress: { data[ 'progress' ] * 100 } %" )
# Handle workflow events
if data.get( "type" ) == "workflow_node_execution_start" :
print ( f "Node started: { data[ 'node_id' ] } " )
Source: src/solace_agent_mesh/common/a2a/events.py:134-168
Client-Side Handling
Clients can display real-time progress:
// Subscribe to status topic
const statusTopic = `production/a2a/v1/gateway/status/gw_123/ ${ taskId } ` ;
messaging . subscribe ( statusTopic , ( message ) => {
const payload = JSON . parse ( message . getBinaryAttachment ());
const event = payload . result ;
if ( ! event . final ) {
// Intermediate update
const parts = event . status . message . parts ;
// Extract progress data
const dataPart = parts . find ( p => p . data );
if ( dataPart ?. data ?. progress ) {
updateProgressBar ( dataPart . data . progress );
}
// Extract text message
const textPart = parts . find ( p => p . text );
if ( textPart ) {
updateStatusText ( textPart . text );
}
} else {
// Final update - task complete
handleTaskComplete ( event );
}
});
Artifact Updates
For streaming large artifacts (chunked delivery):
TaskArtifactUpdateEvent
{
"task_id" : "task_abc123" ,
"context_id" : "session_789" ,
"artifact" : {
"id" : "artifact_xyz" ,
"name" : "large_report.pdf" ,
"mime_type" : "application/pdf" ,
"inline_data" : {
"data" : "<base64-chunk>"
}
},
"append" : true ,
"last_chunk" : false ,
"kind" : "artifact-update" ,
"metadata" : {
"chunk_index" : 1 ,
"total_chunks" : 10
}
}
Artifact object with chunk data.
If true, append this chunk to previous content.
If true, this is the final chunk.
Publishing Artifact Updates
artifact_update = a2a.create_artifact_update(
task_id = task_id,
context_id = session_id,
artifact = Artifact(
name = "report.pdf" ,
mime_type = "application/pdf" ,
inline_data = InlineData(
data = base64.b64encode(chunk_bytes).decode()
)
),
append = True ,
last_chunk = (chunk_index == total_chunks - 1 ),
metadata = {
"chunk_index" : chunk_index,
"total_chunks" : total_chunks
}
)
Source: src/solace_agent_mesh/common/a2a/events.py:98-129
Best Practices
Update Frequency
Too frequent : Network overhead, message congestion
Too infrequent : Poor UX, no perceived progress
Recommended : 1-5 seconds for long tasks, or at logical milestones
# Throttle updates
last_update = time.time()
update_interval = 2.0 # seconds
for i, item in enumerate (items):
process_item(item)
# Only update every 2 seconds
if time.time() - last_update > update_interval:
await publish_status_update(
progress = i / len (items),
message = f "Processed { i } / { len (items) } items"
)
last_update = time.time()
Meaningful Progress
Provide actionable information:
# Good: Specific and informative
await publish_status_update(
message = "Validating order items (2/5 complete)" ,
data = {
"progress" : 0.4 ,
"current_step" : "validation" ,
"items_validated" : 2 ,
"total_items" : 5
}
)
# Bad: Vague and unhelpful
await publish_status_update(
message = "Processing..."
)
Error Streaming
Stream errors for better debugging:
try :
result = await process_item(item)
except ValidationError as e:
# Stream error as status update
await publish_status_update(
state = TaskState.working, # Still working
message = f "Warning: Item { item.id } validation failed: { e } " ,
data = {
"error_type" : "validation_warning" ,
"item_id" : item.id,
"error_message" : str (e)
}
)
# Continue processing other items
Complete Example: Streaming Workflow
# Workflow publishes events throughout execution
# 1. Workflow start
await publish_workflow_event(
WorkflowExecutionStartData(
type = "workflow_execution_start" ,
workflow_name = "DataProcessing" ,
execution_id = "exec_123"
)
)
# 2. Node execution start
await publish_workflow_event(
WorkflowNodeExecutionStartData(
type = "workflow_node_execution_start" ,
node_id = "validate_data" ,
node_type = "agent" ,
agent_name = "DataValidator"
)
)
# 3. Node execution complete
await publish_workflow_event(
WorkflowNodeExecutionResultData(
type = "workflow_node_execution_result" ,
node_id = "validate_data" ,
status = "success"
)
)
# 4. Map progress updates
await publish_workflow_event(
WorkflowMapProgressData(
type = "workflow_map_progress" ,
node_id = "process_records" ,
total_items = 1000 ,
completed_items = 250 ,
status = "in-progress"
)
)
# 5. Workflow completion
await publish_workflow_event(
WorkflowExecutionResultData(
type = "workflow_execution_result" ,
status = "success" ,
workflow_output = { "processed" : 1000 }
)
)
Client receives all these events in real-time for visualization.
Next Steps
Task Invocation Learn about task invocation patterns
Workflow Execution Understand workflow execution lifecycle