Endpoint
Server-Sent Events (SSE) stream for live updates on tool calls, approvals, and audit events.
The route is /v1/events/stream (not /v1/events).
Authentication
Accepts either:
Query parameter authentication (for clients that cannot set headers) curl -N "http://127.0.0.1:9090/v1/events/stream?token= $TOKEN "
Bearer token authentication (preferred) curl -N -H "Authorization: Bearer $TOKEN " \
"http://127.0.0.1:9090/v1/events/stream"
Prefer header-based authentication when possible. Use ?token= only where header injection is not possible (e.g., EventSource in browsers).
Content-Type : text/event-stream
Cache-Control : no-cache
X-Accel-Buffering : no
Each message is sent as SSE data: JSON.
Connection Event
Sent immediately upon successful connection:
data: {"type":"connected"}
Event Types
Event type:
connected — Initial connection established
approvals — Approval queue changed (new/resolved/expired)
audit — Single audit event (tool call evaluated)
audit_batch — Multiple audit events (bulk operation)
Approvals Event
Broadcast when:
New approval created
Approval resolved (approved/denied)
Approval expired
Clients should re-query GET /v1/approvals to get current state.
Audit Event
{
"type" : "audit" ,
"event" : {
"id" : "01J9K8L7M6N5" ,
"timestamp" : "2026-03-03T10:15:30Z" ,
"agent" : "claude-code" ,
"session" : "myapp/main" ,
"tool" : "exec" ,
"request" : {
"command" : "git status"
},
"decision" : {
"action" : "allow" ,
"matched_policies" : [ "allow-git" ],
"evaluation_time_us" : 8 ,
"message" : "git command allowed"
},
"prev_hash" : "abc123..." ,
"hash" : "def456..."
}
}
Decision: allow, deny, watch, ask, etc.
event.decision.matched_policies
Array of matched policy names
event.decision.evaluation_time_us
Evaluation time in microseconds
event.decision.suggestions
Suggestions (on deny)
SHA-256 hash of previous event (hash chain)
SHA-256 hash of this event
Audit Batch Event
{
"type" : "audit_batch" ,
"run_id" : "run_01J..."
}
Broadcast after bulk approval resolve to indicate multiple audit events were written. Clients should re-query GET /v1/audit/events instead of processing individual events.
Status Codes
Status Meaning 200 OKStream established 401 UnauthorizedMissing or invalid token 500 Internal Server ErrorStreaming unsupported by writer
Examples
cURL (Long-lived Connection)
TOKEN = "$( cat ~/.rampart/token)"
curl -N -H "Authorization: Bearer $TOKEN " \
"http://127.0.0.1:9090/v1/events/stream"
Output:
data: {"type":"connected"}
data: {"type":"audit","event":{"id":"01J...","timestamp":"2026-03-03T10:15:30Z","agent":"claude-code","session":"myapp/main","tool":"exec","request":{"command":"git status"},"decision":{"action":"allow","matched_policies":["allow-git"],"evaluation_time_us":8,"message":"git command allowed"},"prev_hash":"abc","hash":"def"}}
data: {"type":"approvals"}
The -N flag disables buffering, allowing real-time event delivery.
Query Parameter Auth
TOKEN = "$( cat ~/.rampart/token)"
curl -N "http://127.0.0.1:9090/v1/events/stream?token= $TOKEN "
JavaScript (Browser)
const token = 'your-token-here' ;
const eventSource = new EventSource (
`http://localhost:9090/v1/events/stream?token= ${ token } `
);
eventSource . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
switch ( data . type ) {
case 'connected' :
console . log ( 'Connected to Rampart event stream' );
break ;
case 'audit' :
console . log ( 'Audit event:' , data . event . tool , data . event . decision . action );
// Update UI with new audit event
break ;
case 'approvals' :
console . log ( 'Approval queue changed — refreshing...' );
// Re-fetch GET /v1/approvals
break ;
case 'audit_batch' :
console . log ( 'Bulk audit event for run:' , data . run_id );
// Re-fetch audit history
break ;
}
};
eventSource . onerror = ( error ) => {
console . error ( 'SSE error:' , error );
// EventSource will automatically reconnect
};
Python (Streaming)
import requests
import json
import os
token = open (os.path.expanduser( "~/.rampart/token" )).read().strip()
headers = { "Authorization" : f "Bearer { token } " }
url = "http://localhost:9090/v1/events/stream"
response = requests.get(url, headers = headers, stream = True )
for line in response.iter_lines():
if line:
line_str = line.decode( 'utf-8' )
if line_str.startswith( 'data: ' ):
data = json.loads(line_str[ 6 :]) # Strip 'data: '
if data[ 'type' ] == 'connected' :
print ( "Connected to Rampart" )
elif data[ 'type' ] == 'audit' :
event = data[ 'event' ]
print ( f " { event[ 'tool' ] } : { event[ 'decision' ][ 'action' ] } " )
elif data[ 'type' ] == 'approvals' :
print ( "Approval queue changed" )
Go (SSE Client)
package main
import (
" bufio "
" encoding/json "
" fmt "
" log "
" net/http "
" os "
" strings "
)
type SSEEvent struct {
Type string `json:"type"`
Event json . RawMessage `json:"event,omitempty"`
RunID string `json:"run_id,omitempty"`
}
func main () {
token , _ := os . ReadFile ( os . ExpandEnv ( "$HOME/.rampart/token" ))
req , _ := http . NewRequest ( "GET" , "http://localhost:9090/v1/events/stream" , nil )
req . Header . Set ( "Authorization" , "Bearer " + strings . TrimSpace ( string ( token )))
resp , err := http . DefaultClient . Do ( req )
if err != nil {
log . Fatal ( err )
}
defer resp . Body . Close ()
scanner := bufio . NewScanner ( resp . Body )
for scanner . Scan () {
line := scanner . Text ()
if ! strings . HasPrefix ( line , "data: " ) {
continue
}
var event SSEEvent
if err := json . Unmarshal ([] byte ( line [ 6 :]), & event ); err != nil {
continue
}
fmt . Printf ( "Event: %s \n " , event . Type )
}
}
Operational Notes
Long-lived Connections
Keep SSE clients long-lived (curl -N, EventSource, or equivalent). The stream remains open until:
Client disconnects
Server shuts down
Network error
Reconnection
Clients should reconnect on error. EventSource handles this automatically in browsers.
Notifications vs. Authoritative Data
Treat events as notifications . Re-query authoritative endpoints when needed:
On approvals event → GET /v1/approvals
On audit_batch event → GET /v1/audit/events
Do not rely on receiving every audit event over SSE. Network issues or client buffering can cause missed events. Use audit API endpoints for authoritative history.
Use Cases
Real-time Dashboard
Display live tool calls and approvals:
const eventSource = new EventSource ( `/v1/events/stream?token= ${ token } ` );
eventSource . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
if ( data . type === 'audit' ) {
addToActivityFeed ( data . event );
}
if ( data . type === 'approvals' ) {
refreshApprovalQueue ();
}
};
Monitoring Integration
Forward events to external monitoring:
for line in response.iter_lines():
if line and line.startswith( b 'data: ' ):
data = json.loads(line[ 6 :])
if data[ 'type' ] == 'audit' :
event = data[ 'event' ]
if event[ 'decision' ][ 'action' ] == 'deny' :
# Alert on blocked operations
send_to_monitoring(event)
Approval Notifications
Trigger alerts when approvals are pending:
curl -N -H "Authorization: Bearer $TOKEN " \
"http://127.0.0.1:9090/v1/events/stream" | \
while read -r line ; do
if echo " $line " | grep -q '"type":"approvals"' ; then
# Send notification (Slack, email, etc.)
echo "Approval queue changed — check dashboard"
fi
done
Next Steps
Approvals Manage approval workflows
Status Server status and health