Skip to main content

Overview

A Graph connects nodes together to form a computation pipeline. Edges are inferred automatically: if node A produces output “x” and node B has input “x”, they’re connected.
from hypergraph import node, Graph

@node(output_name="result")
def add(a: int, b: int) -> int:
    return a + b

@node(output_name="final")
def double(result: int) -> int:
    return result * 2

g = Graph([add, double])
print(g.outputs)         # ('result', 'final')
print(g.inputs.required) # ('a', 'b')
The graph automatically connects adddouble because add produces “result” and double consumes “result”.

Graph Construction

Basic Constructor

Graph(
    nodes: list[HyperNode],
    *,
    edges: list[tuple] | None = None,
    name: str | None = None,
    strict_types: bool = False,
)
nodes
list[HyperNode]
required
List of nodes to include in the graph
edges
list[tuple]
Explicit edge declarations. Each edge is a tuple of:
  • (source, target) - values inferred from output/input overlap
  • (source, target, values) - explicit value names
When provided, automatic edge inference is disabled.
name
str
Optional graph name (required for nesting via as_node())
strict_types
bool
default:"False"
If True, validate type compatibility between connected nodes at construction time

Automatic Edge Inference

By default, edges are inferred from matching names:
@node(output_name="embedding")
def embed(text: str) -> list[float]:
    return [0.1, 0.2, 0.3]

@node(output_name="docs")
def retrieve(embedding: list[float]) -> list[str]:
    return ["doc1", "doc2"]

# Automatic connection: embed → retrieve (via "embedding")
graph = Graph([embed, retrieve])
Name your outputs and inputs consistently - edges are inferred automatically!

Explicit Edges

For cyclic graphs or precise control, declare edges explicitly:
@node(output_name="state")
def process(state: dict) -> dict:
    return {"count": state.get("count", 0) + 1}

# Explicit cycle: process outputs "state", which feeds back as input
graph = Graph(
    [process],
    edges=[(process, process, "state")],
)
When edges is provided, automatic inference is disabled. You must declare all edges.

Graph Properties

Core Properties

graph = Graph([add, double])

# Required inputs (no defaults, not bound)
graph.inputs.required    # ('a', 'b')

# Optional inputs (have defaults)
graph.inputs.optional    # ()

# All inputs (required + optional)
graph.inputs.all         # ('a', 'b')

# Bound values (pre-filled via bind())
graph.inputs.bound       # {}

# All outputs produced by nodes
graph.outputs            # ('result', 'final')

# Outputs from leaf nodes (no downstream consumers)
graph.leaf_outputs       # ('final',)

Graph Methods

bind() - Pre-fill Values

Bind values act as pre-filled run() inputs - overridable at runtime:
graph = Graph([add, double])
bound = graph.bind(a=10)

print(bound.inputs.required)  # ('b',) - only 'b' is needed
print(bound.inputs.bound)     # {'a': 10}

# At runtime, bound values can be overridden
result = runner.run(bound, {"b": 5, "a": 20})  # a=20 overrides bind
Binding is immutable - it returns a new graph.

select() - Filter Outputs

Control which outputs are returned:
graph = Graph([embed, retrieve, generate])
filtered = graph.select("answer")

result = runner.run(filtered, inputs)
print(list(result.keys()))  # ["answer"] - only selected output
select() also narrows graph.inputs to only parameters needed for selected outputs.

with_entrypoint() - Skip Upstream

Define where execution enters the graph:
@node(output_name="raw")
def fetch() -> str:
    return "data"

@node(output_name="processed")
def process(raw: str) -> str:
    return raw.upper()

@node(output_name="final")
def output(processed: str) -> str:
    return processed + "!"

graph = Graph([fetch, process, output])

# Skip fetch, provide processed value directly
partial = graph.with_entrypoint("process")
print(partial.inputs.required)  # ('processed',) - fetch is skipped
Useful for testing specific parts of a pipeline or resuming from checkpoints.

add_nodes() - Extend Graph

Add nodes to an existing graph:
graph = Graph([embed, retrieve])
extended = graph.add_nodes(generate, summarize)

print(len(extended.nodes))  # 4 nodes total
Only works with auto-inferred edges. Raises GraphConfigError if graph was constructed with explicit edges.

unbind() - Remove Bindings

Remove specific bindings:
bound = graph.bind(a=10, b=20)
partial = bound.unbind("b")

print(partial.inputs.bound)  # {'a': 10}

Type Validation

Enable strict_types

Catch type errors at graph construction time:
@node(output_name="value")
def producer() -> int:
    return 42

@node(output_name="result")
def consumer(value: int) -> int:
    return value * 2

# Types match - construction succeeds
graph = Graph([producer, consumer], strict_types=True)

Type Mismatch Errors

@node(output_name="value")
def producer() -> int:
    return 42

@node(output_name="result")
def consumer(value: str) -> str:  # Expects str, but producer gives int
    return value.upper()

# Raises GraphConfigError immediately
Graph([producer, consumer], strict_types=True)
# GraphConfigError: Type mismatch on edge 'producer' → 'consumer' (value='value')
#   Output type: int
#   Input type:  str
Use strict_types=True during development to catch wiring mistakes early!

Missing Annotations

With strict_types=True, all connected nodes must have type annotations:
@node(output_name="value")
def producer():  # Missing return type!
    return 42

@node(output_name="result")
def consumer(value: int) -> int:
    return value * 2

# Raises GraphConfigError
Graph([producer, consumer], strict_types=True)
# GraphConfigError: Missing type annotation in strict_types mode
#   -> Node 'producer' output 'value' has no type annotation

Hierarchical Composition

as_node() - Nest Graphs

Wrap a graph as a node for composition:
@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

inner = Graph([double], name="inner")
outer = Graph([inner.as_node(), other_node])

print(inner.as_node().name)     # "inner"
print(inner.as_node().inputs)   # ('x',)
print(inner.as_node().outputs)  # ('doubled',)
The wrapped graph’s inputs become the node’s inputs, and its outputs become the node’s outputs.

Map Over Inputs

Execute a nested graph multiple times:
inner = Graph([double], name="inner")
mapped = inner.as_node().map_over("x")

# In outer graph, provide list of values
outer = Graph([mapped])
result = runner.run(outer, {"x": [1, 2, 3, 4, 5]})
print(result["doubled"])  # [2, 4, 6, 8, 10]
Think singular, scale with map: write logic for one item, scale to many with .map_over().

Validation

Graph validates structure at construction time:

Duplicate Node Names

@node(output_name="x")
def process(a: int) -> int:
    return a * 2

Graph([process, process])  # Raises GraphConfigError
# GraphConfigError: Duplicate node name: 'process'

Missing Inputs

@node(output_name="result")
def process(missing: int) -> int:
    return missing * 2

Graph([process])  # Valid - 'missing' becomes a required input
print(graph.inputs.required)  # ('missing',)
Unproduced inputs become graph inputs - this enables graph composition and entrypoints.

Conflicting Outputs

Multiple nodes can produce the same output only if mutually exclusive (behind gates):
@node(output_name="result")
def path_a(x: int) -> int:
    return x

@node(output_name="result")
def path_b(x: int) -> int:
    return x * 2

Graph([path_a, path_b])  # Raises GraphConfigError
# GraphConfigError: Multiple nodes produce output 'result': ['path_a', 'path_b']
#   -> Outputs must be unique unless nodes are mutually exclusive (behind gates)
Use @route or @ifelse to create mutually exclusive branches.

Visualization

Interactive HTML Visualization

graph.visualize()
# Opens interactive React Flow visualization in notebook

graph.visualize(
    depth=1,              # Expand nested graphs
    theme="dark",         # "light", "dark", or "auto"
    show_types=True,      # Show type annotations
    filepath="graph.html" # Save to file instead
)

Mermaid Diagrams

diagram = graph.to_mermaid()
print(diagram)  # Renders in notebook, prints raw source

# Customize
diagram = graph.to_mermaid(
    direction="LR",       # Left-to-right layout
    show_types=True,      # Show type annotations
    separate_outputs=True # Render outputs as separate nodes
)

Debug Visualization Issues

debugger = graph.debug_viz()
info = debugger.trace_node("my_node")
print(f"Incoming: {info.incoming_edges}")
print(f"Outgoing: {info.outgoing_edges}")

issues = debugger.find_issues()
for issue in issues:
    print(issue)

Advanced Features

NetworkX Integration

Access the underlying graph structure:
import networkx as nx

# NetworkX DiGraph with node/edge attributes
G = graph.nx_graph

# Analyze structure
print(nx.is_directed_acyclic_graph(G))  # Check for cycles
print(list(nx.topological_sort(G)))     # Execution order (DAGs only)

# Flattened graph (includes nested nodes)
flat = graph.to_flat_graph()
for node_id, attrs in flat.nodes(data=True):
    print(f"{node_id}: parent={attrs['parent']}")

Producer Mapping

# Map output → producing nodes (handles duplicate outputs)
print(graph.self_producers)
# {'result': {'add'}, 'final': {'double'}}

# Outputs with single producer
print(graph.sole_producers)
# {'result': 'add', 'final': 'double'}

Controlled Execution

# Which nodes are controlled by gates?
print(graph.controlled_by)
# {'process_a': ['router'], 'process_b': ['router']}

Common Patterns

Conditional Pipeline

from hypergraph import route, END

@node(output_name="is_valid")
def validate(data: dict) -> bool:
    return "id" in data

@route(targets=["process", END])
def check(is_valid: bool) -> str:
    return "process" if is_valid else END

@node(output_name="result")
def process(data: dict) -> dict:
    return {"processed": True}

graph = Graph([validate, check, process])

Multi-Stage Pipeline

# Stage 1: Fetch
@node(output_name="raw_data")
def fetch(url: str) -> str:
    return "data"

# Stage 2: Process
@node(output_name="processed")
def process(raw_data: str) -> dict:
    return {"result": raw_data}

# Stage 3: Store
@node(output_name="saved")
def store(processed: dict) -> bool:
    return True

pipeline = Graph([fetch, process, store])

RAG Pipeline

@node(output_name="embedding")
def embed(query: str) -> list[float]:
    return [0.1, 0.2, 0.3]

@node(output_name="docs")
def retrieve(embedding: list[float]) -> list[str]:
    return ["doc1", "doc2"]

@node(output_name="answer")
def generate(query: str, docs: list[str]) -> str:
    return f"Answer based on {len(docs)} docs"

rag = Graph([embed, retrieve, generate])
Notice how generate receives both query (from graph input) and docs (from retrieve) automatically!

Next Steps

Runners

Execute graphs with SyncRunner and AsyncRunner

Execution Modes

Understand sync, async, and generator execution

Routing

Control flow with @route and @ifelse

Build docs developers (and LLMs) love