Skip to main content

Overview

Hive provides multi-layered observability for agent executions:
  • EventBus: Real-time pub/sub for execution events
  • Runtime Logs: Three-level structured logging (L1/L2/L3)
  • Trace Context: Automatic correlation across distributed operations
  • Health Monitoring: Autonomous worker health checks

Event-Based Monitoring

EventBus Architecture

The EventBus provides real-time pub/sub for all agent events:
from framework.runtime.event_bus import EventBus, EventType, AgentEvent

# Create event bus
bus = EventBus(max_history=1000, max_concurrent_handlers=10)

# Subscribe to execution events
async def on_execution_complete(event: AgentEvent):
    print(f"Execution {event.execution_id} completed")
    print(f"  Stream: {event.stream_id}")
    print(f"  Output: {event.data.get('output')}")

sub_id = bus.subscribe(
    event_types=[EventType.EXECUTION_COMPLETED],
    handler=on_execution_complete,
)

# Publish an event
await bus.publish(AgentEvent(
    type=EventType.EXECUTION_COMPLETED,
    stream_id="webhook",
    execution_id="exec_123",
    data={"result": "success"},
))

# Cleanup
bus.unsubscribe(sub_id)

Event Types

Execution Lifecycle

# Track execution flow
bus.subscribe(
    event_types=[
        EventType.EXECUTION_STARTED,
        EventType.EXECUTION_COMPLETED,
        EventType.EXECUTION_FAILED,
        EventType.EXECUTION_PAUSED,
        EventType.EXECUTION_RESUMED,
        EventType.EXECUTION_RESURRECTED,  # Auto-restart after failure
    ],
    handler=track_execution_lifecycle,
)

Node Execution

# Monitor node-level progress
bus.subscribe(
    event_types=[
        EventType.NODE_LOOP_STARTED,
        EventType.NODE_LOOP_ITERATION,
        EventType.NODE_LOOP_COMPLETED,
        EventType.NODE_ACTION_PLAN,  # Agent's plan for the node
    ],
    handler=track_node_progress,
)

LLM Streaming

# Real-time LLM output
async def stream_llm_output(event: AgentEvent):
    if event.type == EventType.LLM_TEXT_DELTA:
        print(event.data["content"], end="", flush=True)
    elif event.type == EventType.LLM_TURN_COMPLETE:
        print(f"\n[Model: {event.data['model']}, "
              f"Tokens: {event.data['input_tokens']}+{event.data['output_tokens']}]")

bus.subscribe(
    event_types=[
        EventType.LLM_TEXT_DELTA,
        EventType.LLM_REASONING_DELTA,  # Thinking process (o1 models)
        EventType.LLM_TURN_COMPLETE,
    ],
    handler=stream_llm_output,
)

Tool Execution

# Track tool usage
async def monitor_tools(event: AgentEvent):
    if event.type == EventType.TOOL_CALL_STARTED:
        print(f"Tool: {event.data['tool_name']}")
        print(f"  Input: {event.data['tool_input']}")
    elif event.type == EventType.TOOL_CALL_COMPLETED:
        print(f"  Result: {event.data['result'][:100]}...")
        if event.data.get("is_error"):
            print(f"  ERROR: Tool failed")

bus.subscribe(
    event_types=[
        EventType.TOOL_CALL_STARTED,
        EventType.TOOL_CALL_COMPLETED,
    ],
    handler=monitor_tools,
)

Client I/O

# Monitor user-facing interactions
bus.subscribe(
    event_types=[
        EventType.CLIENT_OUTPUT_DELTA,  # Streaming output to user
        EventType.CLIENT_INPUT_REQUESTED,  # Waiting for user input
    ],
    handler=track_client_interaction,
)

Health & Diagnostics

# Detect problems
bus.subscribe(
    event_types=[
        EventType.NODE_STALLED,  # No progress for N seconds
        EventType.NODE_TOOL_DOOM_LOOP,  # Repeated failed tool calls
        EventType.CONSTRAINT_VIOLATION,  # Goal constraint broken
        EventType.JUDGE_VERDICT,  # Quality judge feedback
    ],
    handler=handle_health_issues,
)

Event Filtering

Filter events by stream, node, execution, or graph:
# Only errors from specific stream
bus.subscribe(
    event_types=[EventType.EXECUTION_FAILED],
    filter_stream="webhook",
    handler=handle_webhook_failures,
)

# Only events from specific node
bus.subscribe(
    event_types=[EventType.NODE_LOOP_ITERATION],
    filter_node="critical-operation",
    handler=monitor_critical_node,
)

# Only events from specific execution
bus.subscribe(
    event_types=[EventType.LLM_TEXT_DELTA],
    filter_execution="exec_123",
    handler=stream_execution_output,
)

# Only events from specific graph (multi-graph sessions)
bus.subscribe(
    event_types=[EventType.EXECUTION_FAILED],
    filter_graph="worker",
    handler=handle_worker_failures,
)

Waiting for Events

Block until a specific event occurs:
# Wait for completion with timeout
event = await bus.wait_for(
    event_type=EventType.EXECUTION_COMPLETED,
    execution_id="exec_123",
    timeout=300.0,
)

if event:
    print(f"Completed: {event.data['output']}")
else:
    print("Timeout waiting for completion")

Event History

Query past events:
# Get recent failures
failures = bus.get_history(
    event_type=EventType.EXECUTION_FAILED,
    limit=10,
)

for event in failures:
    print(f"{event.timestamp}: {event.data['error']}")

# Get all events from an execution
exec_events = bus.get_history(
    execution_id="exec_123",
    limit=1000,
)

Event Debugging

Enable event logging to disk:
# Write all events to JSONL file
export HIVE_DEBUG_EVENTS=1  # ~/.hive/event_logs/<timestamp>.jsonl

# Custom log directory
export HIVE_DEBUG_EVENTS=/tmp/events
Each line is a full JSON event:
{"type": "llm_text_delta", "stream_id": "primary", "node_id": "plan", "execution_id": "exec_abc", "data": {"content": "I will", "snapshot": "I will"}, "timestamp": "2026-02-06T14:30:22.123456", "graph_id": "worker"}
{"type": "tool_call_started", "stream_id": "primary", "node_id": "plan", "execution_id": "exec_abc", "data": {"tool_use_id": "toolu_01ABC", "tool_name": "read", "tool_input": {"filePath": "/home/user/file.txt"}}, "timestamp": "2026-02-06T14:30:25.456789", "graph_id": "worker"}

Runtime Logs (L1/L2/L3)

Hive provides three levels of execution logging:

L1: Summary Logs

High-level run summary written once at completion:
from framework.runtime.runtime_log_store import RuntimeLogStore

log_store = RuntimeLogStore(base_path=Path("./storage"))

# Read summary
summary = await log_store.load_summary("session_20260206_143022_abc12345")

print(f"Run: {summary.run_id}")
print(f"Status: {summary.status}")
print(f"Started: {summary.started_at}")
print(f"Duration: {summary.duration_seconds}s")
print(f"Nodes executed: {len(summary.nodes_executed)}")
print(f"Needs attention: {summary.needs_attention}")

# Query runs
runs = await log_store.list_runs(status="failed", limit=10)

L2: Node Details

Per-node execution details (JSONL, incremental):
# Read node-level details
details = await log_store.load_details("session_20260206_143022_abc12345")

for node in details.nodes:
    print(f"Node: {node.node_id}")
    print(f"  Iterations: {node.iterations}")
    print(f"  Duration: {node.duration_seconds}s")
    print(f"  Result: {node.result}")
    print(f"  LLM calls: {node.llm_call_count}")
    print(f"  Tool calls: {node.tool_call_count}")

L3: Tool Logs

Individual tool call logs (JSONL, incremental):
# Read detailed tool logs
tool_logs = await log_store.load_tool_logs("session_20260206_143022_abc12345")

for step in tool_logs.steps:
    print(f"Step {step.step_number} @ {step.timestamp}")
    print(f"  Node: {step.node_id}")
    print(f"  Tool: {step.tool_name}")
    print(f"  Input: {step.tool_input}")
    print(f"  Result: {step.tool_result[:200]}...")
    if step.tool_error:
        print(f"  ERROR: {step.tool_error}")
L2 and L3 logs use JSONL format (one JSON object per line) for crash resilience. Data is on disk immediately after each node/tool completes, not just at end of run.

Trace Context Propagation

Trace context automatically propagates through all operations:
from framework.observability import configure_logging, get_trace_context

# Configure structured logging
configure_logging(level="INFO", format="json")

# Framework sets trace context automatically:
# Runtime.start_run() → trace_id, execution_id, goal_id
# GraphExecutor.execute() → agent_id
# Node.execute() → node_id

# All logs automatically include context:
import logging
logger = logging.getLogger(__name__)

logger.info("Processing request")  # Automatically includes trace_id, execution_id, etc.

# Access context programmatically
context = get_trace_context()
print(f"Trace ID: {context.get('trace_id')}")
print(f"Execution ID: {context.get('execution_id')}")
Example JSON log:
{
  "timestamp": "2026-02-06T14:30:22.123456Z",
  "level": "info",
  "logger": "myapp",
  "message": "Processing request",
  "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
  "execution_id": "7a9b5c3d2e1f4a8b9c0d1e2f3a4b5c6d",
  "run_id": "run_primary_20260206_143022_abc12345",
  "goal_id": "support-goal",
  "stream_id": "webhook",
  "agent_id": "support_agent"
}

Worker Health Monitoring

Hive includes an autonomous health judge that monitors worker executions:
from framework.monitoring import judge_graph, judge_goal, HEALTH_JUDGE_ENTRY_POINT

# Load health judge as secondary graph
await worker_runtime.add_graph(
    graph_id="judge",
    graph=judge_graph,
    goal=judge_goal,
    entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT},
    storage_subpath="graphs/judge",
)
The judge:
  • Runs every 2 minutes via timer trigger
  • Reads worker session logs via get_worker_health_summary tool
  • Tracks trends in its continuous conversation memory
  • Emits EscalationTicket when degradation patterns detected

Health Patterns

The judge detects: Stalls: No new iterations for 4+ minutes
# Judge detects via unchanged total_steps across checks
{
  "cause": "Worker stalled: no new iterations in 6.2 minutes",
  "severity": "critical",
  "stall_minutes": 6.2
}
Doom Loops: Repeated failures without progress
{
  "cause": "Doom loop: 18 consecutive RETRY verdicts, same error repeated",
  "severity": "high",
  "steps_since_last_accept": 18,
  "evidence_snippet": "Error: File not found /path/to/missing.txt"
}
Excessive Retries: Many iterations without success
{
  "cause": "High retry rate: 12 consecutive non-ACCEPT verdicts",
  "severity": "medium",
  "steps_since_last_accept": 12
}

Escalation Tickets

Listening for escalations:
async def handle_escalation(event: AgentEvent):
    ticket = event.data["ticket"]
    
    print(f"🚨 Worker escalation: {ticket['severity'].upper()}")
    print(f"Cause: {ticket['cause']}")
    print(f"Recommendation: {ticket['suggested_action']}")
    print(f"Evidence: {ticket['evidence_snippet'][:200]}...")
    
    if ticket["severity"] == "critical":
        # Auto-restart worker
        await restart_worker(ticket["worker_session_id"])

bus.subscribe(
    event_types=[EventType.WORKER_ESCALATION_TICKET],
    handler=handle_escalation,
)

Stream Health Metrics

Monitor execution stream health:
from framework.runtime.execution_stream import ExecutionStream

stream: ExecutionStream = runtime._streams["webhook"]

# Check if agent is idle
idle_seconds = stream.agent_idle_seconds
if idle_seconds > 300:
    print(f"Agent idle for {idle_seconds:.1f}s")

# Check if waiting for user input
if stream.is_awaiting_input:
    waiting = stream.get_waiting_nodes()
    print(f"Waiting for input at: {waiting}")

# Get active execution count
active_count = stream.get_active_count()
print(f"Active executions: {active_count}")

# Get detailed stats
stats = stream.get_stats()
print(f"Stream: {stats['stream_id']}")
print(f"  Running: {stats['running']}")
print(f"  Status: {stats['status_counts']}")
print(f"  Slots: {stats['available_slots']}/{stats['max_concurrent']}")

Best Practices

Event Subscriptions

Unsubscribe from events when no longer needed to prevent memory leaks.

Log Levels

Use L1 for dashboards, L2 for debugging, L3 for forensics. L3 can be large.

Async Handlers

Keep event handlers fast. Long-running operations should spawn background tasks.

Error Resilience

Event handler exceptions are logged but don’t break event delivery to other subscribers.

Build docs developers (and LLMs) love