Skip to main content

Runners

Runners are responsible for executing graphs. Different runners provide different execution strategies.

SyncRunner

Synchronous runner for graph execution. Executes graphs without async support.
from hypergraph import Graph, node, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

graph = Graph([double])
runner = SyncRunner()
result = runner.run(graph, {"x": 5})
print(result["doubled"])  # 10

Constructor

cache
CacheBackend | None
Optional cache backend for node result caching. Nodes opt in with cache=True

Properties

capabilities
RunnerCapabilities
Capabilities object describing:
  • supports_cycles=True
  • supports_async_nodes=False
  • supports_streaming=False
  • returns_coroutine=False
default_max_iterations
int
Default iteration cap for cyclic graphs (1000)
supported_node_types
set[type[HyperNode]]
Node types this runner can execute: {FunctionNode, GraphNode, IfElseNode, RouteNode}

Methods

run

result = runner.run(graph, {"x": 5}, select="doubled")
Execute a graph synchronously.
graph
Graph
required
The graph to execute
values
dict[str, Any] | None
Optional input values dict
select
str | list[str]
default:"**"
Which outputs to return. "**" (default) = all outputs
on_missing
Literal['ignore', 'warn', 'error']
default:"ignore"
How to handle missing selected outputs
on_internal_override
Literal['ignore', 'warn', 'error']
default:"warn"
How to handle non-conflicting internal input overrides
entrypoint
str | None
Optional explicit cycle entry point node name
max_iterations
int | None
Max iterations for cyclic graphs (None = use default 1000)
error_handling
Literal['raise', 'continue']
default:"raise"
How to handle node execution errors. “raise” (default) re-raises the original exception. “continue” returns RunResult with status=FAILED and partial values
event_processors
list[EventProcessor] | None
Optional list of event processors to receive execution events
**input_values
Any
Input values shorthand (merged with values)
RunResult
Dict-like object with output values and execution metadata:
  • Access outputs: result["key"] or result.get("key")
  • Status: result.status (“completed” or “failed”)
  • Error: result.error (exception if failed)

map

results = runner.map(
    graph,
    {"items": [1, 2, 3]},
    map_over="items"
)
Execute a graph multiple times with different inputs.
graph
Graph
required
The graph to execute
values
dict[str, Any] | None
Optional input values dict (some should be lists for map_over)
map_over
str | list[str]
required
Parameter name(s) to iterate over
map_mode
Literal['zip', 'product']
default:"zip"
“zip” for parallel iteration, “product” for cartesian product
clone
bool | list[str]
default:"False"
Deep-copy broadcast values per iteration. False (default) = share by reference, True = deep-copy all broadcast values, list[str] = deep-copy only named params
select
str | list[str]
default:"**"
Which outputs to return
on_missing
Literal['ignore', 'warn', 'error']
default:"ignore"
How to handle missing selected outputs
on_internal_override
Literal['ignore', 'warn', 'error']
default:"warn"
How to handle non-conflicting internal input overrides
event_processors
list[EventProcessor] | None
Optional list of event processors to receive execution events
**input_values
Any
Input values shorthand (merged with values)
list[RunResult]
List of RunResult, one per iteration

AsyncRunner

Asynchronous runner for graph execution. Supports both sync and async nodes with concurrent execution.
from hypergraph import Graph, node, AsyncRunner
import asyncio

@node(output_name="doubled")
async def double(x: int) -> int:
    await asyncio.sleep(0.1)
    return x * 2

graph = Graph([double])
runner = AsyncRunner()
result = await runner.run(graph, {"x": 5})
print(result["doubled"])  # 10

Constructor

cache
CacheBackend | None
Optional cache backend for node result caching. Nodes opt in with cache=True

Properties

capabilities
RunnerCapabilities
Capabilities object describing:
  • supports_cycles=True
  • supports_async_nodes=True
  • supports_streaming=False
  • returns_coroutine=True
  • supports_interrupts=True
default_max_iterations
int
Default iteration cap for cyclic graphs (1000)
supported_node_types
set[type[HyperNode]]
Node types this runner can execute: {FunctionNode, GraphNode, IfElseNode, RouteNode, InterruptNode}

Methods

run

result = await runner.run(
    graph,
    {"x": 5},
    max_concurrency=10
)
Execute a graph asynchronously.
graph
Graph
required
The graph to execute
values
dict[str, Any] | None
Optional input values dict
select
str | list[str]
default:"**"
Which outputs to return
on_missing
Literal['ignore', 'warn', 'error']
default:"ignore"
How to handle missing selected outputs
on_internal_override
Literal['ignore', 'warn', 'error']
default:"warn"
How to handle non-conflicting internal input overrides
entrypoint
str | None
Optional explicit cycle entry point node name
max_iterations
int | None
Max iterations for cyclic graphs (None = use default 1000)
max_concurrency
int | None
Maximum number of nodes to execute concurrently within each superstep. None = unlimited
error_handling
Literal['raise', 'continue']
default:"raise"
How to handle node execution errors
event_processors
list[EventProcessor] | None
Optional list of event processors to receive execution events
**input_values
Any
Input values shorthand (merged with values)
RunResult
Coroutine that returns RunResult with output values and execution metadata

arun

result = await runner.arun(graph, {"x": 5})
Alias for run() - executes a graph asynchronously. Identical signature to run().

map

results = await runner.map(
    graph,
    {"items": [1, 2, 3]},
    map_over="items"
)
Execute a graph multiple times with different inputs (async).
graph
Graph
required
The graph to execute
values
dict[str, Any] | None
Optional input values dict
map_over
str | list[str]
required
Parameter name(s) to iterate over
map_mode
Literal['zip', 'product']
default:"zip"
“zip” for parallel iteration, “product” for cartesian product
clone
bool | list[str]
default:"False"
Deep-copy broadcast values per iteration
select
str | list[str]
default:"**"
Which outputs to return
on_missing
Literal['ignore', 'warn', 'error']
default:"ignore"
How to handle missing selected outputs
on_internal_override
Literal['ignore', 'warn', 'error']
default:"warn"
How to handle non-conflicting internal input overrides
max_concurrency
int | None
Maximum concurrent executions within each superstep
event_processors
list[EventProcessor] | None
Optional list of event processors
**input_values
Any
Input values shorthand
list[RunResult]
Coroutine that returns list of RunResult, one per iteration

amap

results = await runner.amap(graph, {"items": [1, 2, 3]}, map_over="items")
Alias for map() - executes a graph multiple times asynchronously. Identical signature to map().

RunResult

Dict-like object returned by runner execution methods.
result = runner.run(graph, {"x": 5})

# Access outputs
value = result["doubled"]
value = result.get("doubled", default=0)

# Check status
if result.status == "completed":
    print("Success!")

# Access error (if failed)
if result.error:
    print(f"Failed: {result.error}")

Properties

status
Literal['completed', 'failed']
Execution status
error
BaseException | None
Exception if execution failed, None otherwise

Methods

Supports standard dict operations:
  • result[key] - Get output value, raises KeyError if missing
  • result.get(key, default=None) - Get output value with default
  • key in result - Check if output exists
  • result.keys() - Get output names
  • result.values() - Get output values
  • result.items() - Get (name, value) pairs

PauseInfo

Information about a paused execution from an @interrupt node. Returned in RunResult.pause when execution pauses for human input.
from hypergraph import interrupt, AsyncRunner, Graph

@interrupt(output_name="decision")
def approval(draft: str) -> str | None:
    return None  # Pause for human review

graph = Graph([generate_draft, approval, finalize])
runner = AsyncRunner()

result = await runner.run(graph, {"prompt": "Write a blog post"})

if result.paused:
    pause_info = result.pause
    print(f"Paused at: {pause_info.node_name}")
    print(f"Value: {pause_info.value}")
    print(f"Resume key: {pause_info.response_key}")
    
    # Resume with human input
    result = await runner.run(graph, {
        "prompt": "Write a blog post",
        pause_info.response_key: "approved"
    })

Properties

node_name
str
Name of the InterruptNode that paused (uses ”/” for nested graphs)
output_param
str
The first output parameter name (for single-output nodes)
value
Any
The first input value surfaced to the caller (for single-input interrupts)
output_params
tuple[str, ...] | None
All output parameter names if multi-output node, else None
values
dict[str, Any] | None
All input values as {name: value} if multi-input interrupt, else None
response_key
str
Key to use in values dict when resuming execution.
  • Top-level interrupt: returns output_param directly (e.g., "decision")
  • Nested interrupt: dot-separated path (e.g., "review.decision")
response_keys
dict[str, str]
Mapping of all output names to their resume keys (for multi-output interrupts)

Multi-Output Example

@interrupt(output_name=("approved", "feedback"))
def review(draft: str) -> tuple[bool | None, str | None]:
    return None, None  # Pause for both values

result = await runner.run(graph, values)

if result.paused:
    pause = result.pause
    print(pause.output_params)  # ('approved', 'feedback')
    print(pause.response_keys)  # {'approved': '...', 'feedback': '...'}
    
    # Resume with all values
    result = await runner.run(graph, {
        **values,
        pause.response_keys["approved"]: True,
        pause.response_keys["feedback"]: "Looks good!"
    })

ErrorHandling

Type for controlling error behavior during batch processing with runner.map().
from typing import Literal

ErrorHandling = Literal["raise", "continue"]

Values

raise
str
Stop immediately on first error and raise the exception (default)
continue
str
Continue processing remaining items even if some fail

Usage

# Stop on first error (default)
results = runner.map(
    graph,
    {"x": [1, 2, 3, 4]},
    map_over="x",
    error_handling="raise"  # Raises immediately if any item fails
)

# Continue despite errors
results = runner.map(
    graph,
    {"x": [1, 2, 3, 4]},
    map_over="x",
    error_handling="continue"  # Returns all results, check status
)

# Check individual results
for i, result in enumerate(results):
    if result.status == "completed":
        print(f"Item {i}: {result['output']}")
    else:
        print(f"Item {i} failed: {result.error}")
Use error_handling="continue" for batch processing where you want to collect all successful results even if some items fail. Check result.status to filter successes from failures.

RunStatus

Enum indicating the outcome of graph execution.
from enum import Enum

class RunStatus(str, Enum):
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"

Values

COMPLETED
str
Graph executed successfully to completion
FAILED
str
Graph execution failed with an error
PAUSED
str
Graph execution paused at an @interrupt node waiting for input

Usage

result = runner.run(graph, values)

if result.status == RunStatus.COMPLETED:
    print("Success:", result["output"])
elif result.status == RunStatus.FAILED:
    print("Error:", result.error)
elif result.status == RunStatus.PAUSED:
    print("Paused at:", result.pause.node_name)

BaseRunner

Abstract base class for all runners. Use SyncRunner or AsyncRunner instead of subclassing directly.

Abstract Methods

capabilities
property
Must return RunnerCapabilities describing runner features
run
method
Must implement graph execution
map
method
Must implement batched graph execution

Build docs developers (and LLMs) love