Skip to main content

Endpoint

GET /v1/events/stream
Server-Sent Events (SSE) stream for live updates on tool calls, approvals, and audit events.
The route is /v1/events/stream (not /v1/events).

Authentication

Accepts either:
token
string
Query parameter authentication (for clients that cannot set headers)
curl -N "http://127.0.0.1:9090/v1/events/stream?token=$TOKEN"
Authorization
string
Bearer token authentication (preferred)
curl -N -H "Authorization: Bearer $TOKEN" \
  "http://127.0.0.1:9090/v1/events/stream"
Prefer header-based authentication when possible. Use ?token= only where header injection is not possible (e.g., EventSource in browsers).

Response Headers

Content-Type: text/event-stream
Cache-Control: no-cache
X-Accel-Buffering: no

Event Format

Each message is sent as SSE data: JSON.

Connection Event

Sent immediately upon successful connection:
data: {"type":"connected"}

Event Types

type
string
required
Event type:
  • connected — Initial connection established
  • approvals — Approval queue changed (new/resolved/expired)
  • audit — Single audit event (tool call evaluated)
  • audit_batch — Multiple audit events (bulk operation)

Approvals Event

{"type": "approvals"}
Broadcast when:
  • New approval created
  • Approval resolved (approved/denied)
  • Approval expired
Clients should re-query GET /v1/approvals to get current state.

Audit Event

{
  "type": "audit",
  "event": {
    "id": "01J9K8L7M6N5",
    "timestamp": "2026-03-03T10:15:30Z",
    "agent": "claude-code",
    "session": "myapp/main",
    "tool": "exec",
    "request": {
      "command": "git status"
    },
    "decision": {
      "action": "allow",
      "matched_policies": ["allow-git"],
      "evaluation_time_us": 8,
      "message": "git command allowed"
    },
    "prev_hash": "abc123...",
    "hash": "def456..."
  }
}

Audit Batch Event

{
  "type": "audit_batch",
  "run_id": "run_01J..."
}
Broadcast after bulk approval resolve to indicate multiple audit events were written. Clients should re-query GET /v1/audit/events instead of processing individual events.

Status Codes

StatusMeaning
200 OKStream established
401 UnauthorizedMissing or invalid token
500 Internal Server ErrorStreaming unsupported by writer

Examples

cURL (Long-lived Connection)

TOKEN="$(cat ~/.rampart/token)"
curl -N -H "Authorization: Bearer $TOKEN" \
  "http://127.0.0.1:9090/v1/events/stream"
Output:
data: {"type":"connected"}

data: {"type":"audit","event":{"id":"01J...","timestamp":"2026-03-03T10:15:30Z","agent":"claude-code","session":"myapp/main","tool":"exec","request":{"command":"git status"},"decision":{"action":"allow","matched_policies":["allow-git"],"evaluation_time_us":8,"message":"git command allowed"},"prev_hash":"abc","hash":"def"}}

data: {"type":"approvals"}

The -N flag disables buffering, allowing real-time event delivery.

Query Parameter Auth

TOKEN="$(cat ~/.rampart/token)"
curl -N "http://127.0.0.1:9090/v1/events/stream?token=$TOKEN"

JavaScript (Browser)

const token = 'your-token-here';
const eventSource = new EventSource(
  `http://localhost:9090/v1/events/stream?token=${token}`
);

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  switch (data.type) {
    case 'connected':
      console.log('Connected to Rampart event stream');
      break;
    
    case 'audit':
      console.log('Audit event:', data.event.tool, data.event.decision.action);
      // Update UI with new audit event
      break;
    
    case 'approvals':
      console.log('Approval queue changed — refreshing...');
      // Re-fetch GET /v1/approvals
      break;
    
    case 'audit_batch':
      console.log('Bulk audit event for run:', data.run_id);
      // Re-fetch audit history
      break;
  }
};

eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  // EventSource will automatically reconnect
};

Python (Streaming)

import requests
import json
import os

token = open(os.path.expanduser("~/.rampart/token")).read().strip()
headers = {"Authorization": f"Bearer {token}"}

url = "http://localhost:9090/v1/events/stream"
response = requests.get(url, headers=headers, stream=True)

for line in response.iter_lines():
    if line:
        line_str = line.decode('utf-8')
        if line_str.startswith('data: '):
            data = json.loads(line_str[6:])  # Strip 'data: '
            
            if data['type'] == 'connected':
                print("Connected to Rampart")
            
            elif data['type'] == 'audit':
                event = data['event']
                print(f"{event['tool']}: {event['decision']['action']}")
            
            elif data['type'] == 'approvals':
                print("Approval queue changed")

Go (SSE Client)

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"strings"
)

type SSEEvent struct {
	Type  string          `json:"type"`
	Event json.RawMessage `json:"event,omitempty"`
	RunID string          `json:"run_id,omitempty"`
}

func main() {
	token, _ := os.ReadFile(os.ExpandEnv("$HOME/.rampart/token"))
	
	req, _ := http.NewRequest("GET", "http://localhost:9090/v1/events/stream", nil)
	req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(string(token)))
	
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		log.Fatal(err)
	}
	defer resp.Body.Close()
	
	scanner := bufio.NewScanner(resp.Body)
	for scanner.Scan() {
		line := scanner.Text()
		if !strings.HasPrefix(line, "data: ") {
			continue
		}
		
		var event SSEEvent
		if err := json.Unmarshal([]byte(line[6:]), &event); err != nil {
			continue
		}
		
		fmt.Printf("Event: %s\n", event.Type)
	}
}

Operational Notes

Long-lived Connections

Keep SSE clients long-lived (curl -N, EventSource, or equivalent). The stream remains open until:
  • Client disconnects
  • Server shuts down
  • Network error

Reconnection

Clients should reconnect on error. EventSource handles this automatically in browsers.

Notifications vs. Authoritative Data

Treat events as notifications. Re-query authoritative endpoints when needed:
  • On approvals event → GET /v1/approvals
  • On audit_batch event → GET /v1/audit/events
Do not rely on receiving every audit event over SSE. Network issues or client buffering can cause missed events. Use audit API endpoints for authoritative history.

Use Cases

Real-time Dashboard

Display live tool calls and approvals:
const eventSource = new EventSource(`/v1/events/stream?token=${token}`);

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  if (data.type === 'audit') {
    addToActivityFeed(data.event);
  }
  
  if (data.type === 'approvals') {
    refreshApprovalQueue();
  }
};

Monitoring Integration

Forward events to external monitoring:
for line in response.iter_lines():
    if line and line.startswith(b'data: '):
        data = json.loads(line[6:])
        
        if data['type'] == 'audit':
            event = data['event']
            if event['decision']['action'] == 'deny':
                # Alert on blocked operations
                send_to_monitoring(event)

Approval Notifications

Trigger alerts when approvals are pending:
curl -N -H "Authorization: Bearer $TOKEN" \
  "http://127.0.0.1:9090/v1/events/stream" | \
while read -r line; do
  if echo "$line" | grep -q '"type":"approvals"'; then
    # Send notification (Slack, email, etc.)
    echo "Approval queue changed — check dashboard"
  fi
done

Next Steps

Approvals

Manage approval workflows

Status

Server status and health

Build docs developers (and LLMs) love