Skip to main content

Overview

Runners execute graphs and return results. Hypergraph provides two runners:
  • SyncRunner - Sequential execution for synchronous nodes
  • AsyncRunner - Concurrent execution with async support
Both implement the same core interface, making it easy to switch between sync and async execution.

Quick Start

from hypergraph import Graph, node, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

graph = Graph([double])
runner = SyncRunner()

result = runner.run(graph, {"x": 5})
print(result["doubled"])  # 10
print(result.status)      # RunStatus.COMPLETED

SyncRunner

Constructor

SyncRunner(cache: CacheBackend | None = None)
cache
CacheBackend
Optional cache backend for node result caching. Nodes opt in with cache=True.

Capabilities

runner = SyncRunner()
print(runner.capabilities)
# RunnerCapabilities(
#     supports_cycles=True,
#     supports_async_nodes=False,
#     supports_streaming=False,
#     returns_coroutine=False,
# )
SyncRunner cannot execute async nodes. Use AsyncRunner for graphs with async functions.

run() Method

runner.run(
    graph: Graph,
    values: dict[str, Any],
    *,
    max_iterations: int | None = None,
    select: list[str] | None = None,
    event_processors: list[EventProcessor] | None = None,
    parent_span_id: str | None = None,
) -> RunResult
graph
Graph
required
Graph to execute
values
dict[str, Any]
required
Input values for the graph
max_iterations
int
Maximum iterations for cyclic graphs (default: 1000)
select
list[str]
Output names to include in result (default: all outputs)
event_processors
list[EventProcessor]
Event processors for observability
parent_span_id
str
Parent span ID for nested execution tracing

Example

from hypergraph import Graph, node, SyncRunner

@node(output_name="embedding")
def embed(query: str) -> list[float]:
    return [0.1, 0.2, 0.3]

@node(output_name="docs")
def retrieve(embedding: list[float]) -> list[str]:
    return ["doc1", "doc2"]

graph = Graph([embed, retrieve])
runner = SyncRunner()

result = runner.run(graph, {"query": "What is RAG?"})

print(result["docs"])         # ["doc1", "doc2"]
print(result["embedding"])    # [0.1, 0.2, 0.3]
print(result.status)          # RunStatus.COMPLETED

AsyncRunner

Constructor

AsyncRunner(cache: CacheBackend | None = None)
cache
CacheBackend
Optional cache backend for node result caching. Nodes opt in with cache=True.

Capabilities

runner = AsyncRunner()
print(runner.capabilities)
# RunnerCapabilities(
#     supports_cycles=True,
#     supports_async_nodes=True,
#     supports_streaming=False,
#     returns_coroutine=True,
#     supports_interrupts=True,
# )
AsyncRunner supports both sync and async nodes, with concurrent execution of independent nodes.

run() Method

await runner.run(
    graph: Graph,
    values: dict[str, Any],
    *,
    max_iterations: int | None = None,
    max_concurrency: int | None = None,
    select: list[str] | None = None,
    event_processors: list[EventProcessor] | None = None,
    parent_span_id: str | None = None,
) -> RunResult
graph
Graph
required
Graph to execute
values
dict[str, Any]
required
Input values for the graph
max_iterations
int
Maximum iterations for cyclic graphs (default: 1000)
max_concurrency
int
Maximum number of concurrent tasks (default: unlimited)
select
list[str]
Output names to include in result (default: all outputs)
event_processors
list[EventProcessor]
Event processors for observability
parent_span_id
str
Parent span ID for nested execution tracing

Example - Concurrent Execution

import asyncio
from hypergraph import Graph, node, AsyncRunner

@node(output_name="response")
async def call_llm(prompt: str) -> str:
    await asyncio.sleep(0.1)  # Simulate API call
    return f"Response to: {prompt}"

@node(output_name="embedding")
async def embed(text: str) -> list[float]:
    await asyncio.sleep(0.1)  # Simulate API call
    return [0.1, 0.2, 0.3]

# Independent nodes run concurrently
graph = Graph([call_llm, embed])
runner = AsyncRunner()

result = await runner.run(
    graph,
    {"prompt": "Hello", "text": "World"},
    max_concurrency=10,  # Limit parallel operations
)

print(result["response"])    # "Response to: Hello"
print(result["embedding"])   # [0.1, 0.2, 0.3]
Independent nodes (no data dependencies between them) execute concurrently within each superstep.

Concurrency Control

The max_concurrency parameter limits parallel task execution:
# Unlimited concurrency (default)
result = await runner.run(graph, values)

# Limit to 5 concurrent tasks
result = await runner.run(graph, values, max_concurrency=5)

# Sequential execution (no concurrency)
result = await runner.run(graph, values, max_concurrency=1)
Use max_concurrency to:
  • Prevent rate limit errors with external APIs
  • Control memory usage when processing large datasets
  • Debug race conditions

RunResult

Both runners return a RunResult object:
result = runner.run(graph, values)

# Access outputs by name
print(result["output_name"])

# Get all outputs as dict
print(result.values)

# Check execution status
from hypergraph.runners import RunStatus

if result.status == RunStatus.COMPLETED:
    print("Success!")
else:
    print(f"Failed: {result.error}")

# Access metadata
print(result.run_id)    # Unique execution ID
print(result.graph_name) # Name of executed graph

Status Values

class RunStatus(Enum):
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"  # AsyncRunner with InterruptNode

Batch Processing

runner.map() - Process Multiple Inputs

Execute a graph multiple times with different inputs:
from hypergraph import Graph, node, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

graph = Graph([double])
runner = SyncRunner()

# Process multiple values
results = runner.map(
    graph,
    {"x": [1, 2, 3, 4, 5]},  # x is a list
    map_over="x",            # Iterate over x
)

print([r["doubled"] for r in results])  # [2, 4, 6, 8, 10]

Map Over Multiple Parameters

1

Zip Mode (Default)

Parallel iteration - lists must be equal length:
@node(output_name="result")
def add(a: int, b: int) -> int:
    return a + b

graph = Graph([add])
runner = SyncRunner()

results = runner.map(
    graph,
    {"a": [1, 2, 3], "b": [10, 20, 30]},
    map_over=["a", "b"],
    map_mode="zip",  # (1,10), (2,20), (3,30)
)

print([r["result"] for r in results])  # [11, 22, 33]
2

Product Mode

Cartesian product - all combinations:
results = runner.map(
    graph,
    {"a": [1, 2], "b": [10, 20]},
    map_over=["a", "b"],
    map_mode="product",  # (1,10), (1,20), (2,10), (2,20)
)

print([r["result"] for r in results])  # [11, 21, 12, 22]

Error Handling in Maps

results = runner.map(
    graph,
    {"x": [1, 2, 3, 4, 5]},
    map_over="x",
    error_handling="continue",  # Continue on failure
)

# Failed items are None
for i, result in enumerate(results):
    if result is None:
        print(f"Item {i} failed")
    else:
        print(f"Item {i}: {result['doubled']}")
Default behavior is error_handling="raise" which stops on first failure.

Execution Control

Selecting Outputs

Return only specific outputs:
result = runner.run(
    graph,
    values,
    select=["final_answer"],
)

print(result.values.keys())  # Only "final_answer"

Max Iterations

Control cyclic graph execution:
# Default: 1000 iterations
result = runner.run(graph, values)

# Custom limit
result = runner.run(graph, values, max_iterations=100)

# Unlimited (dangerous!)
result = runner.run(graph, values, max_iterations=float('inf'))
If max_iterations is reached and nodes are still ready, InfiniteLoopError is raised.

Caching

Enable Caching

Provide a cache backend to the runner:
from hypergraph import InMemoryCache, DiskCache

# In-memory cache
cache = InMemoryCache()
runner = SyncRunner(cache=cache)

# Persistent disk cache
cache = DiskCache(".cache")
runner = SyncRunner(cache=cache)

Cache-Enabled Nodes

Nodes opt in to caching:
@node(output_name="embedding", cache=True)
def embed(text: str) -> list[float]:
    # Expensive computation
    return model.embed(text)

# First call computes and caches
result1 = runner.run(graph, {"text": "hello"})

# Second call returns cached result
result2 = runner.run(graph, {"text": "hello"})  # Instant!
Cache keys include:
  1. Node’s definition_hash (function source code)
  2. Input values (hashed)
Changing the function invalidates the cache automatically.

Observability

Event Processors

Monitor execution with event processors:
from hypergraph.events import EventProcessor, NodeStartEvent, NodeEndEvent

class LoggingProcessor(EventProcessor):
    def on_node_start(self, event: NodeStartEvent):
        print(f"Starting {event.node_name}")
    
    def on_node_end(self, event: NodeEndEvent):
        print(f"Finished {event.node_name} in {event.duration_ms}ms")

processor = LoggingProcessor()
result = runner.run(
    graph,
    values,
    event_processors=[processor],
)

Rich Progress

Built-in progress bars:
from hypergraph.events import RichProgressProcessor

processor = RichProgressProcessor()
result = runner.run(
    graph,
    values,
    event_processors=[processor],
)

Available Events

  • RunStartEvent - Graph execution begins
  • RunEndEvent - Graph execution completes
  • NodeStartEvent - Node execution begins
  • NodeEndEvent - Node execution completes
  • RouteDecisionEvent - Gate node makes routing decision
  • CacheHitEvent - Cached result found
  • CacheMissEvent - Cache lookup failed

Error Handling

Execution Errors

Errors during execution are wrapped in ExecutionError:
from hypergraph.exceptions import ExecutionError

try:
    result = runner.run(graph, values)
except ExecutionError as e:
    print(f"Execution failed: {e}")
    print(f"Partial state: {e.state}")
    print(f"Original error: {e.__cause__}")

Common Errors

Graph contains async nodes but SyncRunner was used:
@node(output_name="result")
async def async_node(x: int) -> int:
    return x * 2

graph = Graph([async_node])
runner = SyncRunner()

runner.run(graph, {"x": 5})  # IncompatibleRunnerError
Fix: Use AsyncRunner instead.
Cyclic graph exceeded max_iterations:
@node(output_name="state")
def loop(state: dict) -> dict:
    return {"count": state["count"] + 1}  # Infinite loop!

graph = Graph([loop], edges=[(loop, loop, "state")])
runner.run(graph, {"state": {"count": 0}}, max_iterations=100)
# InfiniteLoopError: Reached max_iterations (100)
Fix: Add a termination condition with @route or increase max_iterations.
Required input not provided:
graph = Graph([add])  # Requires 'a' and 'b'
runner.run(graph, {"a": 5})  # MissingInputError: Missing input 'b'
Fix: Provide all required inputs or use graph.bind() for defaults.

Advanced Features

Nested Execution

Runners automatically handle nested graphs:
inner = Graph([double], name="inner")
outer = Graph([inner.as_node(), add], name="outer")

# Events propagate through nested execution
result = runner.run(outer, values, event_processors=[processor])

Execution Tracing

Trace execution hierarchy:
result = runner.run(
    graph,
    values,
    parent_span_id="parent-123",  # Link to parent execution
)

print(result.run_id)    # Unique ID for this run
print(result.span_id)   # Span ID for tracing

Comparison: Sync vs Async

FeatureSyncRunnerAsyncRunner
Async nodes❌ No✅ Yes
ConcurrencySequentialParallel (within supersteps)
Interrupt nodes❌ No✅ Yes
Streaming❌ No❌ No (future)
ReturnsRunResultRunResult (awaitable)
Best forSimple pipelines, scriptsAPI calls, I/O-bound tasks

Best Practices

1

Choose the Right Runner

  • Use SyncRunner for simple, sequential workflows
  • Use AsyncRunner for I/O-bound tasks (API calls, database queries)
  • Use AsyncRunner for concurrent execution of independent nodes
2

Control Concurrency

# Good: Limit concurrent API calls
result = await runner.run(graph, values, max_concurrency=5)

# Bad: Unlimited concurrency may hit rate limits
result = await runner.run(graph, values)  # No limit!
3

Handle Errors Gracefully

try:
    result = runner.run(graph, values)
    if result.status == RunStatus.COMPLETED:
        process_results(result.values)
    else:
        log_error(result.error)
except ExecutionError as e:
    handle_partial_state(e.state)
4

Use Caching for Expensive Operations

# Cache embeddings, API calls, heavy computations
@node(output_name="embedding", cache=True)
def embed(text: str) -> list[float]:
    return expensive_model.embed(text)

Next Steps

Execution Modes

Deep dive into sync, async, and generator execution

Caching

Optimize performance with result caching

Observability

Monitor execution with event processors

Build docs developers (and LLMs) love