Overview
AgenticPal supports real-time streaming via Server-Sent Events (SSE), allowing you to receive agent responses as they’re generated. This provides a better user experience for long-running operations.
Two-Step Streaming Pattern
The streaming API uses a two-step process for security and efficiency:
Store the message
POST /chat/message - Stores the user message server-side and returns a thread ID
Open SSE stream
GET /chat/stream - Opens an SSE connection where the agent processes and streams the response
This pattern avoids sending sensitive data via URL parameters and allows proper authentication via cookies.
Step 1: Send Message
First, send your message to be queued for processing.
Endpoint
Request
curl -X POST http://localhost:8000/chat/message \
-H "Content-Type: application/json" \
-b "session_id=your-session-id" \
-d '{
"user_message": "Summarize my emails from this week",
"thread_id": "optional-thread-id"
}'
Response
{
"thread_id" : "thread_abc123" ,
"status" : "queued"
}
Step 2: Stream Response
Open an SSE connection to receive the streamed response.
Endpoint
GET /chat/stream?thread_id={thread_id}
Implementation Examples
Python with SSE Client
JavaScript with EventSource
cURL (basic)
import requests
from sseclient import SSEClient
# Step 1: Send message
response = requests.post(
"http://localhost:8000/chat/message" ,
cookies = { "session_id" : "your-session-id" },
json = { "user_message" : "What's on my calendar today?" }
)
thread_id = response.json()[ "thread_id" ]
# Step 2: Stream response
url = f "http://localhost:8000/chat/stream?thread_id= { thread_id } "
response = requests.get(
url,
cookies = { "session_id" : "your-session-id" },
stream = True
)
client = SSEClient(response)
for event in client.events():
data = json.loads(event.data)
event_type = data[ "event_type" ]
if event_type == "token" :
print (data[ "data" ][ "token" ], end = "" , flush = True )
elif event_type == "complete" :
print ( " \n\n Response complete!" )
print ( f "Full response: { data[ 'data' ][ 'response' ] } " )
elif event_type == "error" :
print ( f "Error: { data[ 'data' ][ 'message' ] } " )
Event Types
The SSE stream emits different event types during processing:
Token Events
Streams individual tokens as they’re generated by the LLM.
event: token
data: { "event_type" : "token" , "data" : { "token" : "Hello" }}
event: token
data: { "event_type" : "token" , "data" : { "token" : " there" }}
event: token
data: { "event_type" : "token" , "data" : { "token" : "!" }}
Node Events
Indicates which graph node is currently executing.
event: node_start
data: { "event_type" : "node_start" , "data" : { "node" : "classify_intent" }}
event: node_end
data: { "event_type" : "node_end" , "data" : { "node" : "classify_intent" }}
event: node_start
data: { "event_type" : "node_start" , "data" : { "node" : "execute_parallel" }}
Completion Event
Sent when processing is complete.
event: complete
data: {
"event_type" : "complete" ,
"data" : {
"thread_id" : "thread_abc123" ,
"response" : "You have 2 meetings today: Team Standup at 9 AM and Client Call at 2 PM." ,
"results" : {
"action_1" : {
"tool" : "list_calendar_events" ,
"success" : true ,
"result" : { "events" : [ ... ]}
}
}
}
}
Confirmation Required Event
Sent when the agent needs user confirmation.
event: confirmation_required
data: {
"event_type" : "confirmation_required" ,
"data" : {
"thread_id" : "thread_abc123" ,
"message" : "This will delete 15 emails. Confirm?" ,
"pending_actions" : [
{ "id" : "action_1" , "tool" : "delete_emails" , "count" : 15 }
]
}
}
Error Event
Sent when an error occurs during processing.
event: error
data: {
"event_type" : "error" ,
"data" : {
"error" : "agent_error" ,
"message" : "Failed to connect to Gmail API"
}
}
Complete Example: React Component
Here’s a full React example integrating streaming:
import React , { useState } from 'react' ;
function ChatInterface () {
const [ message , setMessage ] = useState ( '' );
const [ response , setResponse ] = useState ( '' );
const [ isStreaming , setIsStreaming ] = useState ( false );
const sendMessage = async () => {
setIsStreaming ( true );
setResponse ( '' );
// Step 1: Send message
const messageRes = await fetch ( 'http://localhost:8000/chat/message' , {
method: 'POST' ,
credentials: 'include' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ({ user_message: message })
});
const { thread_id } = await messageRes . json ();
// Step 2: Stream response
const eventSource = new EventSource (
`http://localhost:8000/chat/stream?thread_id= ${ thread_id } ` ,
{ withCredentials: true }
);
eventSource . addEventListener ( 'token' , ( event ) => {
const data = JSON . parse ( event . data );
setResponse ( prev => prev + data . data . token );
});
eventSource . addEventListener ( 'complete' , ( event ) => {
setIsStreaming ( false );
eventSource . close ();
});
eventSource . addEventListener ( 'error' , ( event ) => {
const data = JSON . parse ( event . data );
console . error ( 'Stream error:' , data . data . message );
setIsStreaming ( false );
eventSource . close ();
});
};
return (
< div >
< input
value = { message }
onChange = { ( e ) => setMessage ( e . target . value ) }
placeholder = "Ask me anything..."
disabled = { isStreaming }
/>
< button onClick = { sendMessage } disabled = { isStreaming } >
{ isStreaming ? 'Streaming...' : 'Send' }
</ button >
< div className = "response" > { response } </ div >
</ div >
);
}
export default ChatInterface ;
Advanced: Streaming with Progress Updates
You can track which nodes are executing to show detailed progress:
import requests
import json
from sseclient import SSEClient
# Send message
response = requests.post(
"http://localhost:8000/chat/message" ,
cookies = { "session_id" : "your-session-id" },
json = { "user_message" : "Create a meeting and send invites" }
)
thread_id = response.json()[ "thread_id" ]
# Stream with progress tracking
url = f "http://localhost:8000/chat/stream?thread_id= { thread_id } "
response = requests.get(url, cookies = { "session_id" : "your-session-id" }, stream = True )
client = SSEClient(response)
current_response = ""
for event in client.events():
data = json.loads(event.data)
event_type = data[ "event_type" ]
if event_type == "node_start" :
node = data[ "data" ][ "node" ]
print ( f " \n [Processing: { node } ]" )
elif event_type == "node_end" :
node = data[ "data" ][ "node" ]
print ( f "[Completed: { node } ]" )
elif event_type == "token" :
token = data[ "data" ][ "token" ]
current_response += token
print (token, end = "" , flush = True )
elif event_type == "complete" :
print ( " \n\n ✓ Complete!" )
results = data[ "data" ][ "results" ]
print ( f "Executed { len (results) } actions successfully." )
break
elif event_type == "error" :
print ( f " \n ✗ Error: { data[ 'data' ][ 'message' ] } " )
break
Example Output
[Processing: classify_intent]
[Completed: classify_intent]
[Processing: plan_actions]
[Completed: plan_actions]
[Processing: execute_parallel]
I've created a meeting titled "Project Sync" for tomorrow at 2 PM and sent invitations to the team.
[Completed: execute_parallel]
[Processing: synthesize_response]
[Completed: synthesize_response]
✓ Complete!
Executed 2 actions successfully.
Error Handling
Always handle connection errors and timeouts:
try :
response = requests.get(
url,
cookies = { "session_id" : "your-session-id" },
stream = True ,
timeout = 60
)
for event in SSEClient(response).events():
# Process events
pass
except requests.exceptions.Timeout:
print ( "Stream timeout - connection took too long" )
except requests.exceptions.ConnectionError:
print ( "Connection error - server may be down" )
except Exception as e:
print ( f "Unexpected error: { e } " )
SSE connections remain open until complete. Always close the connection when done to free up resources.
Connection Pooling Limit concurrent SSE connections per client to avoid overwhelming the server
Timeouts Set reasonable timeouts (30-60 seconds) to prevent hanging connections
Buffering The backend disables buffering with X-Accel-Buffering: no for immediate streaming
Rate Limiting Both message sending and streaming count toward your rate limit
Next Steps
API Reference Detailed streaming endpoint documentation
Confirmation Flow Handle actions requiring user confirmation