Overview
Hive provides multi-layered observability for agent executions:
EventBus : Real-time pub/sub for execution events
Runtime Logs : Three-level structured logging (L1/L2/L3)
Trace Context : Automatic correlation across distributed operations
Health Monitoring : Autonomous worker health checks
Event-Based Monitoring
EventBus Architecture
The EventBus provides real-time pub/sub for all agent events:
from framework.runtime.event_bus import EventBus, EventType, AgentEvent
# Create event bus
bus = EventBus( max_history = 1000 , max_concurrent_handlers = 10 )
# Subscribe to execution events
async def on_execution_complete ( event : AgentEvent):
print ( f "Execution { event.execution_id } completed" )
print ( f " Stream: { event.stream_id } " )
print ( f " Output: { event.data.get( 'output' ) } " )
sub_id = bus.subscribe(
event_types = [EventType. EXECUTION_COMPLETED ],
handler = on_execution_complete,
)
# Publish an event
await bus.publish(AgentEvent(
type = EventType. EXECUTION_COMPLETED ,
stream_id = "webhook" ,
execution_id = "exec_123" ,
data = { "result" : "success" },
))
# Cleanup
bus.unsubscribe(sub_id)
Event Types
Execution Lifecycle
# Track execution flow
bus.subscribe(
event_types = [
EventType. EXECUTION_STARTED ,
EventType. EXECUTION_COMPLETED ,
EventType. EXECUTION_FAILED ,
EventType. EXECUTION_PAUSED ,
EventType. EXECUTION_RESUMED ,
EventType. EXECUTION_RESURRECTED , # Auto-restart after failure
],
handler = track_execution_lifecycle,
)
Node Execution
# Monitor node-level progress
bus.subscribe(
event_types = [
EventType. NODE_LOOP_STARTED ,
EventType. NODE_LOOP_ITERATION ,
EventType. NODE_LOOP_COMPLETED ,
EventType. NODE_ACTION_PLAN , # Agent's plan for the node
],
handler = track_node_progress,
)
LLM Streaming
# Real-time LLM output
async def stream_llm_output ( event : AgentEvent):
if event.type == EventType. LLM_TEXT_DELTA :
print (event.data[ "content" ], end = "" , flush = True )
elif event.type == EventType. LLM_TURN_COMPLETE :
print ( f " \n [Model: { event.data[ 'model' ] } , "
f "Tokens: { event.data[ 'input_tokens' ] } + { event.data[ 'output_tokens' ] } ]" )
bus.subscribe(
event_types = [
EventType. LLM_TEXT_DELTA ,
EventType. LLM_REASONING_DELTA , # Thinking process (o1 models)
EventType. LLM_TURN_COMPLETE ,
],
handler = stream_llm_output,
)
# Track tool usage
async def monitor_tools ( event : AgentEvent):
if event.type == EventType. TOOL_CALL_STARTED :
print ( f "Tool: { event.data[ 'tool_name' ] } " )
print ( f " Input: { event.data[ 'tool_input' ] } " )
elif event.type == EventType. TOOL_CALL_COMPLETED :
print ( f " Result: { event.data[ 'result' ][: 100 ] } ..." )
if event.data.get( "is_error" ):
print ( f " ERROR: Tool failed" )
bus.subscribe(
event_types = [
EventType. TOOL_CALL_STARTED ,
EventType. TOOL_CALL_COMPLETED ,
],
handler = monitor_tools,
)
Client I/O
# Monitor user-facing interactions
bus.subscribe(
event_types = [
EventType. CLIENT_OUTPUT_DELTA , # Streaming output to user
EventType. CLIENT_INPUT_REQUESTED , # Waiting for user input
],
handler = track_client_interaction,
)
Health & Diagnostics
# Detect problems
bus.subscribe(
event_types = [
EventType. NODE_STALLED , # No progress for N seconds
EventType. NODE_TOOL_DOOM_LOOP , # Repeated failed tool calls
EventType. CONSTRAINT_VIOLATION , # Goal constraint broken
EventType. JUDGE_VERDICT , # Quality judge feedback
],
handler = handle_health_issues,
)
Event Filtering
Filter events by stream, node, execution, or graph:
# Only errors from specific stream
bus.subscribe(
event_types = [EventType. EXECUTION_FAILED ],
filter_stream = "webhook" ,
handler = handle_webhook_failures,
)
# Only events from specific node
bus.subscribe(
event_types = [EventType. NODE_LOOP_ITERATION ],
filter_node = "critical-operation" ,
handler = monitor_critical_node,
)
# Only events from specific execution
bus.subscribe(
event_types = [EventType. LLM_TEXT_DELTA ],
filter_execution = "exec_123" ,
handler = stream_execution_output,
)
# Only events from specific graph (multi-graph sessions)
bus.subscribe(
event_types = [EventType. EXECUTION_FAILED ],
filter_graph = "worker" ,
handler = handle_worker_failures,
)
Waiting for Events
Block until a specific event occurs:
# Wait for completion with timeout
event = await bus.wait_for(
event_type = EventType. EXECUTION_COMPLETED ,
execution_id = "exec_123" ,
timeout = 300.0 ,
)
if event:
print ( f "Completed: { event.data[ 'output' ] } " )
else :
print ( "Timeout waiting for completion" )
Event History
Query past events:
# Get recent failures
failures = bus.get_history(
event_type = EventType. EXECUTION_FAILED ,
limit = 10 ,
)
for event in failures:
print ( f " { event.timestamp } : { event.data[ 'error' ] } " )
# Get all events from an execution
exec_events = bus.get_history(
execution_id = "exec_123" ,
limit = 1000 ,
)
Event Debugging
Enable event logging to disk:
# Write all events to JSONL file
export HIVE_DEBUG_EVENTS = 1 # ~/.hive/event_logs/<timestamp>.jsonl
# Custom log directory
export HIVE_DEBUG_EVENTS = / tmp / events
Each line is a full JSON event:
{ "type" : "llm_text_delta" , "stream_id" : "primary" , "node_id" : "plan" , "execution_id" : "exec_abc" , "data" : { "content" : "I will" , "snapshot" : "I will" }, "timestamp" : "2026-02-06T14:30:22.123456" , "graph_id" : "worker" }
{ "type" : "tool_call_started" , "stream_id" : "primary" , "node_id" : "plan" , "execution_id" : "exec_abc" , "data" : { "tool_use_id" : "toolu_01ABC" , "tool_name" : "read" , "tool_input" : { "filePath" : "/home/user/file.txt" }}, "timestamp" : "2026-02-06T14:30:25.456789" , "graph_id" : "worker" }
Runtime Logs (L1/L2/L3)
Hive provides three levels of execution logging:
L1: Summary Logs
High-level run summary written once at completion:
from framework.runtime.runtime_log_store import RuntimeLogStore
log_store = RuntimeLogStore( base_path = Path( "./storage" ))
# Read summary
summary = await log_store.load_summary( "session_20260206_143022_abc12345" )
print ( f "Run: { summary.run_id } " )
print ( f "Status: { summary.status } " )
print ( f "Started: { summary.started_at } " )
print ( f "Duration: { summary.duration_seconds } s" )
print ( f "Nodes executed: { len (summary.nodes_executed) } " )
print ( f "Needs attention: { summary.needs_attention } " )
# Query runs
runs = await log_store.list_runs( status = "failed" , limit = 10 )
L2: Node Details
Per-node execution details (JSONL, incremental):
# Read node-level details
details = await log_store.load_details( "session_20260206_143022_abc12345" )
for node in details.nodes:
print ( f "Node: { node.node_id } " )
print ( f " Iterations: { node.iterations } " )
print ( f " Duration: { node.duration_seconds } s" )
print ( f " Result: { node.result } " )
print ( f " LLM calls: { node.llm_call_count } " )
print ( f " Tool calls: { node.tool_call_count } " )
Individual tool call logs (JSONL, incremental):
# Read detailed tool logs
tool_logs = await log_store.load_tool_logs( "session_20260206_143022_abc12345" )
for step in tool_logs.steps:
print ( f "Step { step.step_number } @ { step.timestamp } " )
print ( f " Node: { step.node_id } " )
print ( f " Tool: { step.tool_name } " )
print ( f " Input: { step.tool_input } " )
print ( f " Result: { step.tool_result[: 200 ] } ..." )
if step.tool_error:
print ( f " ERROR: { step.tool_error } " )
L2 and L3 logs use JSONL format (one JSON object per line) for crash resilience . Data is on disk immediately after each node/tool completes, not just at end of run.
Trace Context Propagation
Trace context automatically propagates through all operations:
from framework.observability import configure_logging, get_trace_context
# Configure structured logging
configure_logging( level = "INFO" , format = "json" )
# Framework sets trace context automatically:
# Runtime.start_run() → trace_id, execution_id, goal_id
# GraphExecutor.execute() → agent_id
# Node.execute() → node_id
# All logs automatically include context:
import logging
logger = logging.getLogger( __name__ )
logger.info( "Processing request" ) # Automatically includes trace_id, execution_id, etc.
# Access context programmatically
context = get_trace_context()
print ( f "Trace ID: { context.get( 'trace_id' ) } " )
print ( f "Execution ID: { context.get( 'execution_id' ) } " )
Example JSON log:
{
"timestamp" : "2026-02-06T14:30:22.123456Z" ,
"level" : "info" ,
"logger" : "myapp" ,
"message" : "Processing request" ,
"trace_id" : "4bf92f3577b34da6a3ce929d0e0e4736" ,
"execution_id" : "7a9b5c3d2e1f4a8b9c0d1e2f3a4b5c6d" ,
"run_id" : "run_primary_20260206_143022_abc12345" ,
"goal_id" : "support-goal" ,
"stream_id" : "webhook" ,
"agent_id" : "support_agent"
}
Worker Health Monitoring
Hive includes an autonomous health judge that monitors worker executions:
from framework.monitoring import judge_graph, judge_goal, HEALTH_JUDGE_ENTRY_POINT
# Load health judge as secondary graph
await worker_runtime.add_graph(
graph_id = "judge" ,
graph = judge_graph,
goal = judge_goal,
entry_points = { "health_check" : HEALTH_JUDGE_ENTRY_POINT },
storage_subpath = "graphs/judge" ,
)
The judge:
Runs every 2 minutes via timer trigger
Reads worker session logs via get_worker_health_summary tool
Tracks trends in its continuous conversation memory
Emits EscalationTicket when degradation patterns detected
Health Patterns
The judge detects:
Stalls : No new iterations for 4+ minutes
# Judge detects via unchanged total_steps across checks
{
"cause" : "Worker stalled: no new iterations in 6.2 minutes" ,
"severity" : "critical" ,
"stall_minutes" : 6.2
}
Doom Loops : Repeated failures without progress
{
"cause" : "Doom loop: 18 consecutive RETRY verdicts, same error repeated" ,
"severity" : "high" ,
"steps_since_last_accept" : 18 ,
"evidence_snippet" : "Error: File not found /path/to/missing.txt"
}
Excessive Retries : Many iterations without success
{
"cause" : "High retry rate: 12 consecutive non-ACCEPT verdicts" ,
"severity" : "medium" ,
"steps_since_last_accept" : 12
}
Escalation Tickets
Listening for escalations:
async def handle_escalation ( event : AgentEvent):
ticket = event.data[ "ticket" ]
print ( f "🚨 Worker escalation: { ticket[ 'severity' ].upper() } " )
print ( f "Cause: { ticket[ 'cause' ] } " )
print ( f "Recommendation: { ticket[ 'suggested_action' ] } " )
print ( f "Evidence: { ticket[ 'evidence_snippet' ][: 200 ] } ..." )
if ticket[ "severity" ] == "critical" :
# Auto-restart worker
await restart_worker(ticket[ "worker_session_id" ])
bus.subscribe(
event_types = [EventType. WORKER_ESCALATION_TICKET ],
handler = handle_escalation,
)
Stream Health Metrics
Monitor execution stream health:
from framework.runtime.execution_stream import ExecutionStream
stream: ExecutionStream = runtime._streams[ "webhook" ]
# Check if agent is idle
idle_seconds = stream.agent_idle_seconds
if idle_seconds > 300 :
print ( f "Agent idle for { idle_seconds :.1f} s" )
# Check if waiting for user input
if stream.is_awaiting_input:
waiting = stream.get_waiting_nodes()
print ( f "Waiting for input at: { waiting } " )
# Get active execution count
active_count = stream.get_active_count()
print ( f "Active executions: { active_count } " )
# Get detailed stats
stats = stream.get_stats()
print ( f "Stream: { stats[ 'stream_id' ] } " )
print ( f " Running: { stats[ 'running' ] } " )
print ( f " Status: { stats[ 'status_counts' ] } " )
print ( f " Slots: { stats[ 'available_slots' ] } / { stats[ 'max_concurrent' ] } " )
Best Practices
Event Subscriptions Unsubscribe from events when no longer needed to prevent memory leaks.
Log Levels Use L1 for dashboards, L2 for debugging, L3 for forensics. L3 can be large.
Async Handlers Keep event handlers fast. Long-running operations should spawn background tasks.
Error Resilience Event handler exceptions are logged but don’t break event delivery to other subscribers.