Overview
Runners execute graphs and return results. Hypergraph provides two runners:
SyncRunner - Sequential execution for synchronous nodes
AsyncRunner - Concurrent execution with async support
Both implement the same core interface, making it easy to switch between sync and async execution.
Quick Start
from hypergraph import Graph, node, SyncRunner
@node ( output_name = "doubled" )
def double ( x : int ) -> int :
return x * 2
graph = Graph([double])
runner = SyncRunner()
result = runner.run(graph, { "x" : 5 })
print (result[ "doubled" ]) # 10
print (result.status) # RunStatus.COMPLETED
SyncRunner
Constructor
SyncRunner(cache: CacheBackend | None = None )
Optional cache backend for node result caching. Nodes opt in with cache=True.
Capabilities
runner = SyncRunner()
print (runner.capabilities)
# RunnerCapabilities(
# supports_cycles=True,
# supports_async_nodes=False,
# supports_streaming=False,
# returns_coroutine=False,
# )
SyncRunner cannot execute async nodes. Use AsyncRunner for graphs with async functions.
run() Method
runner.run(
graph: Graph,
values: dict[ str , Any],
* ,
max_iterations: int | None = None ,
select: list[ str ] | None = None ,
event_processors: list[EventProcessor] | None = None ,
parent_span_id: str | None = None ,
) -> RunResult
Input values for the graph
Maximum iterations for cyclic graphs (default: 1000)
Output names to include in result (default: all outputs)
Event processors for observability
Parent span ID for nested execution tracing
Example
from hypergraph import Graph, node, SyncRunner
@node ( output_name = "embedding" )
def embed ( query : str ) -> list[ float ]:
return [ 0.1 , 0.2 , 0.3 ]
@node ( output_name = "docs" )
def retrieve ( embedding : list[ float ]) -> list[ str ]:
return [ "doc1" , "doc2" ]
graph = Graph([embed, retrieve])
runner = SyncRunner()
result = runner.run(graph, { "query" : "What is RAG?" })
print (result[ "docs" ]) # ["doc1", "doc2"]
print (result[ "embedding" ]) # [0.1, 0.2, 0.3]
print (result.status) # RunStatus.COMPLETED
AsyncRunner
Constructor
AsyncRunner(cache: CacheBackend | None = None )
Optional cache backend for node result caching. Nodes opt in with cache=True.
Capabilities
runner = AsyncRunner()
print (runner.capabilities)
# RunnerCapabilities(
# supports_cycles=True,
# supports_async_nodes=True,
# supports_streaming=False,
# returns_coroutine=True,
# supports_interrupts=True,
# )
AsyncRunner supports both sync and async nodes, with concurrent execution of independent nodes.
run() Method
await runner.run(
graph: Graph,
values: dict[ str , Any],
* ,
max_iterations: int | None = None ,
max_concurrency: int | None = None ,
select: list[ str ] | None = None ,
event_processors: list[EventProcessor] | None = None ,
parent_span_id: str | None = None ,
) -> RunResult
Input values for the graph
Maximum iterations for cyclic graphs (default: 1000)
Maximum number of concurrent tasks (default: unlimited)
Output names to include in result (default: all outputs)
Event processors for observability
Parent span ID for nested execution tracing
Example - Concurrent Execution
import asyncio
from hypergraph import Graph, node, AsyncRunner
@node ( output_name = "response" )
async def call_llm ( prompt : str ) -> str :
await asyncio.sleep( 0.1 ) # Simulate API call
return f "Response to: { prompt } "
@node ( output_name = "embedding" )
async def embed ( text : str ) -> list[ float ]:
await asyncio.sleep( 0.1 ) # Simulate API call
return [ 0.1 , 0.2 , 0.3 ]
# Independent nodes run concurrently
graph = Graph([call_llm, embed])
runner = AsyncRunner()
result = await runner.run(
graph,
{ "prompt" : "Hello" , "text" : "World" },
max_concurrency = 10 , # Limit parallel operations
)
print (result[ "response" ]) # "Response to: Hello"
print (result[ "embedding" ]) # [0.1, 0.2, 0.3]
Independent nodes (no data dependencies between them) execute concurrently within each superstep.
Concurrency Control
The max_concurrency parameter limits parallel task execution:
# Unlimited concurrency (default)
result = await runner.run(graph, values)
# Limit to 5 concurrent tasks
result = await runner.run(graph, values, max_concurrency = 5 )
# Sequential execution (no concurrency)
result = await runner.run(graph, values, max_concurrency = 1 )
Use max_concurrency to:
Prevent rate limit errors with external APIs
Control memory usage when processing large datasets
Debug race conditions
RunResult
Both runners return a RunResult object:
result = runner.run(graph, values)
# Access outputs by name
print (result[ "output_name" ])
# Get all outputs as dict
print (result.values)
# Check execution status
from hypergraph.runners import RunStatus
if result.status == RunStatus. COMPLETED :
print ( "Success!" )
else :
print ( f "Failed: { result.error } " )
# Access metadata
print (result.run_id) # Unique execution ID
print (result.graph_name) # Name of executed graph
Status Values
class RunStatus ( Enum ):
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused" # AsyncRunner with InterruptNode
Batch Processing
Execute a graph multiple times with different inputs:
SyncRunner.map()
AsyncRunner.map()
from hypergraph import Graph, node, SyncRunner
@node ( output_name = "doubled" )
def double ( x : int ) -> int :
return x * 2
graph = Graph([double])
runner = SyncRunner()
# Process multiple values
results = runner.map(
graph,
{ "x" : [ 1 , 2 , 3 , 4 , 5 ]}, # x is a list
map_over = "x" , # Iterate over x
)
print ([r[ "doubled" ] for r in results]) # [2, 4, 6, 8, 10]
Map Over Multiple Parameters
Zip Mode (Default)
Parallel iteration - lists must be equal length: @node ( output_name = "result" )
def add ( a : int , b : int ) -> int :
return a + b
graph = Graph([add])
runner = SyncRunner()
results = runner.map(
graph,
{ "a" : [ 1 , 2 , 3 ], "b" : [ 10 , 20 , 30 ]},
map_over = [ "a" , "b" ],
map_mode = "zip" , # (1,10), (2,20), (3,30)
)
print ([r[ "result" ] for r in results]) # [11, 22, 33]
Product Mode
Cartesian product - all combinations: results = runner.map(
graph,
{ "a" : [ 1 , 2 ], "b" : [ 10 , 20 ]},
map_over = [ "a" , "b" ],
map_mode = "product" , # (1,10), (1,20), (2,10), (2,20)
)
print ([r[ "result" ] for r in results]) # [11, 21, 12, 22]
Error Handling in Maps
results = runner.map(
graph,
{ "x" : [ 1 , 2 , 3 , 4 , 5 ]},
map_over = "x" ,
error_handling = "continue" , # Continue on failure
)
# Failed items are None
for i, result in enumerate (results):
if result is None :
print ( f "Item { i } failed" )
else :
print ( f "Item { i } : { result[ 'doubled' ] } " )
Default behavior is error_handling="raise" which stops on first failure.
Execution Control
Selecting Outputs
Return only specific outputs:
result = runner.run(
graph,
values,
select = [ "final_answer" ],
)
print (result.values.keys()) # Only "final_answer"
Max Iterations
Control cyclic graph execution:
# Default: 1000 iterations
result = runner.run(graph, values)
# Custom limit
result = runner.run(graph, values, max_iterations = 100 )
# Unlimited (dangerous!)
result = runner.run(graph, values, max_iterations = float ( 'inf' ))
If max_iterations is reached and nodes are still ready, InfiniteLoopError is raised.
Caching
Enable Caching
Provide a cache backend to the runner:
from hypergraph import InMemoryCache, DiskCache
# In-memory cache
cache = InMemoryCache()
runner = SyncRunner( cache = cache)
# Persistent disk cache
cache = DiskCache( ".cache" )
runner = SyncRunner( cache = cache)
Cache-Enabled Nodes
Nodes opt in to caching:
@node ( output_name = "embedding" , cache = True )
def embed ( text : str ) -> list[ float ]:
# Expensive computation
return model.embed(text)
# First call computes and caches
result1 = runner.run(graph, { "text" : "hello" })
# Second call returns cached result
result2 = runner.run(graph, { "text" : "hello" }) # Instant!
Cache keys include:
Node’s definition_hash (function source code)
Input values (hashed)
Changing the function invalidates the cache automatically.
Observability
Event Processors
Monitor execution with event processors:
from hypergraph.events import EventProcessor, NodeStartEvent, NodeEndEvent
class LoggingProcessor ( EventProcessor ):
def on_node_start ( self , event : NodeStartEvent):
print ( f "Starting { event.node_name } " )
def on_node_end ( self , event : NodeEndEvent):
print ( f "Finished { event.node_name } in { event.duration_ms } ms" )
processor = LoggingProcessor()
result = runner.run(
graph,
values,
event_processors = [processor],
)
Rich Progress
Built-in progress bars:
from hypergraph.events import RichProgressProcessor
processor = RichProgressProcessor()
result = runner.run(
graph,
values,
event_processors = [processor],
)
Available Events
RunStartEvent - Graph execution begins
RunEndEvent - Graph execution completes
NodeStartEvent - Node execution begins
NodeEndEvent - Node execution completes
RouteDecisionEvent - Gate node makes routing decision
CacheHitEvent - Cached result found
CacheMissEvent - Cache lookup failed
Error Handling
Execution Errors
Errors during execution are wrapped in ExecutionError:
from hypergraph.exceptions import ExecutionError
try :
result = runner.run(graph, values)
except ExecutionError as e:
print ( f "Execution failed: { e } " )
print ( f "Partial state: { e.state } " )
print ( f "Original error: { e.__cause__ } " )
Common Errors
Graph contains async nodes but SyncRunner was used: @node ( output_name = "result" )
async def async_node ( x : int ) -> int :
return x * 2
graph = Graph([async_node])
runner = SyncRunner()
runner.run(graph, { "x" : 5 }) # IncompatibleRunnerError
Fix : Use AsyncRunner instead.
Cyclic graph exceeded max_iterations: @node ( output_name = "state" )
def loop ( state : dict ) -> dict :
return { "count" : state[ "count" ] + 1 } # Infinite loop!
graph = Graph([loop], edges = [(loop, loop, "state" )])
runner.run(graph, { "state" : { "count" : 0 }}, max_iterations = 100 )
# InfiniteLoopError: Reached max_iterations (100)
Fix : Add a termination condition with @route or increase max_iterations.
Advanced Features
Nested Execution
Runners automatically handle nested graphs:
inner = Graph([double], name = "inner" )
outer = Graph([inner.as_node(), add], name = "outer" )
# Events propagate through nested execution
result = runner.run(outer, values, event_processors = [processor])
Execution Tracing
Trace execution hierarchy:
result = runner.run(
graph,
values,
parent_span_id = "parent-123" , # Link to parent execution
)
print (result.run_id) # Unique ID for this run
print (result.span_id) # Span ID for tracing
Comparison: Sync vs Async
Feature SyncRunner AsyncRunner Async nodes ❌ No ✅ Yes Concurrency Sequential Parallel (within supersteps) Interrupt nodes ❌ No ✅ Yes Streaming ❌ No ❌ No (future) Returns RunResultRunResult (awaitable)Best for Simple pipelines, scripts API calls, I/O-bound tasks
Best Practices
Choose the Right Runner
Use SyncRunner for simple, sequential workflows
Use AsyncRunner for I/O-bound tasks (API calls, database queries)
Use AsyncRunner for concurrent execution of independent nodes
Control Concurrency
# Good: Limit concurrent API calls
result = await runner.run(graph, values, max_concurrency = 5 )
# Bad: Unlimited concurrency may hit rate limits
result = await runner.run(graph, values) # No limit!
Handle Errors Gracefully
try :
result = runner.run(graph, values)
if result.status == RunStatus. COMPLETED :
process_results(result.values)
else :
log_error(result.error)
except ExecutionError as e:
handle_partial_state(e.state)
Use Caching for Expensive Operations
# Cache embeddings, API calls, heavy computations
@node ( output_name = "embedding" , cache = True )
def embed ( text : str ) -> list[ float ]:
return expensive_model.embed(text)
Next Steps
Execution Modes Deep dive into sync, async, and generator execution
Caching Optimize performance with result caching
Observability Monitor execution with event processors