Skip to main content

Overview

AgenticPal supports real-time streaming via Server-Sent Events (SSE), allowing you to receive agent responses as they’re generated. This provides a better user experience for long-running operations.

Two-Step Streaming Pattern

The streaming API uses a two-step process for security and efficiency:
1

Store the message

POST /chat/message - Stores the user message server-side and returns a thread ID
2

Open SSE stream

GET /chat/stream - Opens an SSE connection where the agent processes and streams the response
This pattern avoids sending sensitive data via URL parameters and allows proper authentication via cookies.

Step 1: Send Message

First, send your message to be queued for processing.

Endpoint

POST /chat/message

Request

curl -X POST http://localhost:8000/chat/message \
  -H "Content-Type: application/json" \
  -b "session_id=your-session-id" \
  -d '{
    "user_message": "Summarize my emails from this week",
    "thread_id": "optional-thread-id"
  }'

Response

{
  "thread_id": "thread_abc123",
  "status": "queued"
}

Step 2: Stream Response

Open an SSE connection to receive the streamed response.

Endpoint

GET /chat/stream?thread_id={thread_id}

Implementation Examples

import requests
from sseclient import SSEClient

# Step 1: Send message
response = requests.post(
    "http://localhost:8000/chat/message",
    cookies={"session_id": "your-session-id"},
    json={"user_message": "What's on my calendar today?"}
)
thread_id = response.json()["thread_id"]

# Step 2: Stream response
url = f"http://localhost:8000/chat/stream?thread_id={thread_id}"
response = requests.get(
    url,
    cookies={"session_id": "your-session-id"},
    stream=True
)

client = SSEClient(response)
for event in client.events():
    data = json.loads(event.data)
    event_type = data["event_type"]
    
    if event_type == "token":
        print(data["data"]["token"], end="", flush=True)
    elif event_type == "complete":
        print("\n\nResponse complete!")
        print(f"Full response: {data['data']['response']}")
    elif event_type == "error":
        print(f"Error: {data['data']['message']}")

Event Types

The SSE stream emits different event types during processing:

Token Events

Streams individual tokens as they’re generated by the LLM.
event: token
data: {"event_type": "token", "data": {"token": "Hello"}}

event: token
data: {"event_type": "token", "data": {"token": " there"}}

event: token
data: {"event_type": "token", "data": {"token": "!"}}

Node Events

Indicates which graph node is currently executing.
event: node_start
data: {"event_type": "node_start", "data": {"node": "classify_intent"}}

event: node_end
data: {"event_type": "node_end", "data": {"node": "classify_intent"}}

event: node_start
data: {"event_type": "node_start", "data": {"node": "execute_parallel"}}

Completion Event

Sent when processing is complete.
event: complete
data: {
  "event_type": "complete",
  "data": {
    "thread_id": "thread_abc123",
    "response": "You have 2 meetings today: Team Standup at 9 AM and Client Call at 2 PM.",
    "results": {
      "action_1": {
        "tool": "list_calendar_events",
        "success": true,
        "result": {"events": [...]}
      }
    }
  }
}

Confirmation Required Event

Sent when the agent needs user confirmation.
event: confirmation_required
data: {
  "event_type": "confirmation_required",
  "data": {
    "thread_id": "thread_abc123",
    "message": "This will delete 15 emails. Confirm?",
    "pending_actions": [
      {"id": "action_1", "tool": "delete_emails", "count": 15}
    ]
  }
}

Error Event

Sent when an error occurs during processing.
event: error
data: {
  "event_type": "error",
  "data": {
    "error": "agent_error",
    "message": "Failed to connect to Gmail API"
  }
}

Complete Example: React Component

Here’s a full React example integrating streaming:
import React, { useState } from 'react';

function ChatInterface() {
  const [message, setMessage] = useState('');
  const [response, setResponse] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = async () => {
    setIsStreaming(true);
    setResponse('');

    // Step 1: Send message
    const messageRes = await fetch('http://localhost:8000/chat/message', {
      method: 'POST',
      credentials: 'include',
      headers: {'Content-Type': 'application/json'},
      body: JSON.stringify({ user_message: message })
    });

    const { thread_id } = await messageRes.json();

    // Step 2: Stream response
    const eventSource = new EventSource(
      `http://localhost:8000/chat/stream?thread_id=${thread_id}`,
      { withCredentials: true }
    );

    eventSource.addEventListener('token', (event) => {
      const data = JSON.parse(event.data);
      setResponse(prev => prev + data.data.token);
    });

    eventSource.addEventListener('complete', (event) => {
      setIsStreaming(false);
      eventSource.close();
    });

    eventSource.addEventListener('error', (event) => {
      const data = JSON.parse(event.data);
      console.error('Stream error:', data.data.message);
      setIsStreaming(false);
      eventSource.close();
    });
  };

  return (
    <div>
      <input
        value={message}
        onChange={(e) => setMessage(e.target.value)}
        placeholder="Ask me anything..."
        disabled={isStreaming}
      />
      <button onClick={sendMessage} disabled={isStreaming}>
        {isStreaming ? 'Streaming...' : 'Send'}
      </button>
      <div className="response">{response}</div>
    </div>
  );
}

export default ChatInterface;

Advanced: Streaming with Progress Updates

You can track which nodes are executing to show detailed progress:
import requests
import json
from sseclient import SSEClient

# Send message
response = requests.post(
    "http://localhost:8000/chat/message",
    cookies={"session_id": "your-session-id"},
    json={"user_message": "Create a meeting and send invites"}
)
thread_id = response.json()["thread_id"]

# Stream with progress tracking
url = f"http://localhost:8000/chat/stream?thread_id={thread_id}"
response = requests.get(url, cookies={"session_id": "your-session-id"}, stream=True)

client = SSEClient(response)
current_response = ""

for event in client.events():
    data = json.loads(event.data)
    event_type = data["event_type"]
    
    if event_type == "node_start":
        node = data["data"]["node"]
        print(f"\n[Processing: {node}]")
    
    elif event_type == "node_end":
        node = data["data"]["node"]
        print(f"[Completed: {node}]")
    
    elif event_type == "token":
        token = data["data"]["token"]
        current_response += token
        print(token, end="", flush=True)
    
    elif event_type == "complete":
        print("\n\n✓ Complete!")
        results = data["data"]["results"]
        print(f"Executed {len(results)} actions successfully.")
        break
    
    elif event_type == "error":
        print(f"\n✗ Error: {data['data']['message']}")
        break

Example Output

[Processing: classify_intent]
[Completed: classify_intent]
[Processing: plan_actions]
[Completed: plan_actions]
[Processing: execute_parallel]
I've created a meeting titled "Project Sync" for tomorrow at 2 PM and sent invitations to the team.
[Completed: execute_parallel]
[Processing: synthesize_response]
[Completed: synthesize_response]

✓ Complete!
Executed 2 actions successfully.

Error Handling

Always handle connection errors and timeouts:
try:
    response = requests.get(
        url,
        cookies={"session_id": "your-session-id"},
        stream=True,
        timeout=60
    )
    
    for event in SSEClient(response).events():
        # Process events
        pass
        
except requests.exceptions.Timeout:
    print("Stream timeout - connection took too long")
except requests.exceptions.ConnectionError:
    print("Connection error - server may be down")
except Exception as e:
    print(f"Unexpected error: {e}")
SSE connections remain open until complete. Always close the connection when done to free up resources.

Performance Considerations

Connection Pooling

Limit concurrent SSE connections per client to avoid overwhelming the server

Timeouts

Set reasonable timeouts (30-60 seconds) to prevent hanging connections

Buffering

The backend disables buffering with X-Accel-Buffering: no for immediate streaming

Rate Limiting

Both message sending and streaming count toward your rate limit

Next Steps

API Reference

Detailed streaming endpoint documentation

Confirmation Flow

Handle actions requiring user confirmation

Build docs developers (and LLMs) love