Skip to main content

Events

Hypergraph emits structured events during graph execution for observability, logging, and monitoring.

Event Types

All events inherit from BaseEvent with common fields:
run_id
str
Unique identifier for the run that produced this event
span_id
str
Unique identifier for this event’s scope (auto-generated)
parent_span_id
str | None
Span ID of the parent scope, or None for root runs
timestamp
float
Unix timestamp when the event was created (auto-generated)

RunStartEvent

Emitted when a graph run begins.
from hypergraph.events import RunStartEvent
graph_name
str
Name of the graph being executed
workflow_id
str | None
Optional workflow identifier for tracking related runs
is_map
bool
Whether this run is part of a map operation
map_size
int | None
Number of items in the map operation, if applicable

RunEndEvent

Emitted when a graph run completes.
from hypergraph.events import RunEndEvent
graph_name
str
Name of the graph that was executed
status
RunStatus
Outcome of the run (RunStatus.COMPLETED or RunStatus.FAILED)
error
str | None
Error message if status is FAILED
duration_ms
float
Wall-clock duration in milliseconds

NodeStartEvent

Emitted when a node begins execution.
from hypergraph.events import NodeStartEvent
node_name
str
Name of the node
graph_name
str
Name of the graph containing the node

NodeEndEvent

Emitted when a node completes successfully.
from hypergraph.events import NodeEndEvent
node_name
str
Name of the node
graph_name
str
Name of the graph containing the node
duration_ms
float
Wall-clock duration in milliseconds
cached
bool
Whether the result was served from cache

CacheHitEvent

Emitted when a node result is served from cache.
from hypergraph.events import CacheHitEvent
node_name
str
Name of the cached node
graph_name
str
Name of the graph containing the node
cache_key
str
The cache key that was hit

NodeErrorEvent

Emitted when a node fails with an exception.
from hypergraph.events import NodeErrorEvent
node_name
str
Name of the node
graph_name
str
Name of the graph containing the node
error
str
Error message
error_type
str
Fully qualified exception type name

RouteDecisionEvent

Emitted when a routing node makes a decision.
from hypergraph.events import RouteDecisionEvent
node_name
str
Name of the routing node
graph_name
str
Name of the graph containing the node
decision
str | list[str]
The chosen target(s)

InterruptEvent

Emitted when execution is interrupted for human-in-the-loop.
from hypergraph.events import InterruptEvent
node_name
str
Name of the node that triggered the interrupt
graph_name
str
Name of the graph containing the node
workflow_id
str | None
Optional workflow identifier
value
object
The interrupt payload
response_param
str
Parameter name expected for the response

StopRequestedEvent

Emitted when a stop is requested on a workflow.
from hypergraph.events import StopRequestedEvent
workflow_id
str | None
Optional workflow identifier

Event Processors

Event processors receive and handle execution events.

EventProcessor

Base class for synchronous event processing.
from hypergraph.events import EventProcessor

class MyProcessor(EventProcessor):
    def process(self, event):
        print(f"Event: {event}")

runner = SyncRunner()
result = runner.run(
    graph,
    values,
    event_processors=[MyProcessor()]
)

AsyncEventProcessor

Base class for asynchronous event processing.
from hypergraph.events import AsyncEventProcessor

class MyAsyncProcessor(AsyncEventProcessor):
    async def process(self, event):
        await log_to_database(event)

runner = AsyncRunner()
result = await runner.run(
    graph,
    values,
    event_processors=[MyAsyncProcessor()]
)

TypedEventProcessor

Type-safe event processor with handler methods for specific event types.
from hypergraph.events import TypedEventProcessor, NodeStartEvent, NodeEndEvent

class MyTypedProcessor(TypedEventProcessor):
    def on_node_start(self, event: NodeStartEvent):
        print(f"Node {event.node_name} started")
    
    def on_node_end(self, event: NodeEndEvent):
        print(f"Node {event.node_name} completed in {event.duration_ms}ms")

runner = SyncRunner()
result = runner.run(
    graph,
    values,
    event_processors=[MyTypedProcessor()]
)
Handler methods:
  • on_run_start(event: RunStartEvent)
  • on_run_end(event: RunEndEvent)
  • on_node_start(event: NodeStartEvent)
  • on_node_end(event: NodeEndEvent)
  • on_cache_hit(event: CacheHitEvent)
  • on_node_error(event: NodeErrorEvent)
  • on_route_decision(event: RouteDecisionEvent)
  • on_interrupt(event: InterruptEvent)
  • on_stop_requested(event: StopRequestedEvent)

RichProgressProcessor

Built-in processor that displays a progress bar using Rich.
from hypergraph.events import RichProgressProcessor

processor = RichProgressProcessor(
    show_progress=True,
    show_tree=False,
    console=None  # auto-creates Rich console
)

runner = AsyncRunner()
result = await runner.run(
    graph,
    values,
    event_processors=[processor]
)
show_progress
bool
default:"True"
Whether to show progress bar
show_tree
bool
default:"False"
Whether to show tree view of nested graph execution
console
Console | None
Rich Console instance, or None to auto-create

RunStatus Enum

Status values for run completion.
from hypergraph.events import RunStatus

RunStatus.COMPLETED  # Run finished successfully
RunStatus.FAILED     # Run encountered an error

Usage Example

from hypergraph import Graph, node, AsyncRunner
from hypergraph.events import (
    TypedEventProcessor,
    NodeStartEvent,
    NodeEndEvent,
    RunEndEvent
)

class MetricsProcessor(TypedEventProcessor):
    def __init__(self):
        self.node_durations = {}
    
    def on_node_end(self, event: NodeEndEvent):
        self.node_durations[event.node_name] = event.duration_ms
    
    def on_run_end(self, event: RunEndEvent):
        print(f"Total run time: {event.duration_ms}ms")
        print(f"Node timings: {self.node_durations}")

@node(output_name="result")
async def process(x: int) -> int:
    return x * 2

graph = Graph([process])
runner = AsyncRunner()
metrics = MetricsProcessor()

result = await runner.run(
    graph,
    {"x": 5},
    event_processors=[metrics]
)

Build docs developers (and LLMs) love