Overview
A Graph connects nodes together to form a computation pipeline. Edges are inferred automatically: if node A produces output “x” and node B has input “x”, they’re connected.
from hypergraph import node, Graph
@node ( output_name = "result" )
def add ( a : int , b : int ) -> int :
return a + b
@node ( output_name = "final" )
def double ( result : int ) -> int :
return result * 2
g = Graph([add, double])
print (g.outputs) # ('result', 'final')
print (g.inputs.required) # ('a', 'b')
The graph automatically connects add → double because add produces “result” and double consumes “result”.
Graph Construction
Basic Constructor
Graph(
nodes: list[HyperNode],
* ,
edges: list[ tuple ] | None = None ,
name: str | None = None ,
strict_types: bool = False ,
)
List of nodes to include in the graph
Explicit edge declarations. Each edge is a tuple of:
(source, target) - values inferred from output/input overlap
(source, target, values) - explicit value names
When provided, automatic edge inference is disabled.
Optional graph name (required for nesting via as_node())
If True, validate type compatibility between connected nodes at construction time
Automatic Edge Inference
By default, edges are inferred from matching names:
@node ( output_name = "embedding" )
def embed ( text : str ) -> list[ float ]:
return [ 0.1 , 0.2 , 0.3 ]
@node ( output_name = "docs" )
def retrieve ( embedding : list[ float ]) -> list[ str ]:
return [ "doc1" , "doc2" ]
# Automatic connection: embed → retrieve (via "embedding")
graph = Graph([embed, retrieve])
Name your outputs and inputs consistently - edges are inferred automatically!
Explicit Edges
For cyclic graphs or precise control, declare edges explicitly:
@node ( output_name = "state" )
def process ( state : dict ) -> dict :
return { "count" : state.get( "count" , 0 ) + 1 }
# Explicit cycle: process outputs "state", which feeds back as input
graph = Graph(
[process],
edges = [(process, process, "state" )],
)
When edges is provided, automatic inference is disabled. You must declare all edges.
Graph Properties
Core Properties
Inputs and Outputs
Structure
Capabilities
graph = Graph([add, double])
# Required inputs (no defaults, not bound)
graph.inputs.required # ('a', 'b')
# Optional inputs (have defaults)
graph.inputs.optional # ()
# All inputs (required + optional)
graph.inputs.all # ('a', 'b')
# Bound values (pre-filled via bind())
graph.inputs.bound # {}
# All outputs produced by nodes
graph.outputs # ('result', 'final')
# Outputs from leaf nodes (no downstream consumers)
graph.leaf_outputs # ('final',)
Graph Methods
bind() - Pre-fill Values
Bind values act as pre-filled run() inputs - overridable at runtime:
graph = Graph([add, double])
bound = graph.bind( a = 10 )
print (bound.inputs.required) # ('b',) - only 'b' is needed
print (bound.inputs.bound) # {'a': 10}
# At runtime, bound values can be overridden
result = runner.run(bound, { "b" : 5 , "a" : 20 }) # a=20 overrides bind
Binding is immutable - it returns a new graph.
select() - Filter Outputs
Control which outputs are returned:
graph = Graph([embed, retrieve, generate])
filtered = graph.select( "answer" )
result = runner.run(filtered, inputs)
print ( list (result.keys())) # ["answer"] - only selected output
select() also narrows graph.inputs to only parameters needed for selected outputs.
with_entrypoint() - Skip Upstream
Define where execution enters the graph:
@node ( output_name = "raw" )
def fetch () -> str :
return "data"
@node ( output_name = "processed" )
def process ( raw : str ) -> str :
return raw.upper()
@node ( output_name = "final" )
def output ( processed : str ) -> str :
return processed + "!"
graph = Graph([fetch, process, output])
# Skip fetch, provide processed value directly
partial = graph.with_entrypoint( "process" )
print (partial.inputs.required) # ('processed',) - fetch is skipped
Useful for testing specific parts of a pipeline or resuming from checkpoints.
add_nodes() - Extend Graph
Add nodes to an existing graph:
graph = Graph([embed, retrieve])
extended = graph.add_nodes(generate, summarize)
print ( len (extended.nodes)) # 4 nodes total
Only works with auto-inferred edges. Raises GraphConfigError if graph was constructed with explicit edges.
unbind() - Remove Bindings
Remove specific bindings:
bound = graph.bind( a = 10 , b = 20 )
partial = bound.unbind( "b" )
print (partial.inputs.bound) # {'a': 10}
Type Validation
Enable strict_types
Catch type errors at graph construction time:
@node ( output_name = "value" )
def producer () -> int :
return 42
@node ( output_name = "result" )
def consumer ( value : int ) -> int :
return value * 2
# Types match - construction succeeds
graph = Graph([producer, consumer], strict_types = True )
Type Mismatch Errors
@node ( output_name = "value" )
def producer () -> int :
return 42
@node ( output_name = "result" )
def consumer ( value : str ) -> str : # Expects str, but producer gives int
return value.upper()
# Raises GraphConfigError immediately
Graph([producer, consumer], strict_types = True )
# GraphConfigError: Type mismatch on edge 'producer' → 'consumer' (value='value')
# Output type: int
# Input type: str
Use strict_types=True during development to catch wiring mistakes early!
Missing Annotations
With strict_types=True, all connected nodes must have type annotations:
@node ( output_name = "value" )
def producer (): # Missing return type!
return 42
@node ( output_name = "result" )
def consumer ( value : int ) -> int :
return value * 2
# Raises GraphConfigError
Graph([producer, consumer], strict_types = True )
# GraphConfigError: Missing type annotation in strict_types mode
# -> Node 'producer' output 'value' has no type annotation
Hierarchical Composition
as_node() - Nest Graphs
Wrap a graph as a node for composition:
@node ( output_name = "doubled" )
def double ( x : int ) -> int :
return x * 2
inner = Graph([double], name = "inner" )
outer = Graph([inner.as_node(), other_node])
print (inner.as_node().name) # "inner"
print (inner.as_node().inputs) # ('x',)
print (inner.as_node().outputs) # ('doubled',)
The wrapped graph’s inputs become the node’s inputs, and its outputs become the node’s outputs.
Execute a nested graph multiple times:
inner = Graph([double], name = "inner" )
mapped = inner.as_node().map_over( "x" )
# In outer graph, provide list of values
outer = Graph([mapped])
result = runner.run(outer, { "x" : [ 1 , 2 , 3 , 4 , 5 ]})
print (result[ "doubled" ]) # [2, 4, 6, 8, 10]
Think singular, scale with map: write logic for one item, scale to many with .map_over().
Validation
Graph validates structure at construction time:
Duplicate Node Names
@node ( output_name = "x" )
def process ( a : int ) -> int :
return a * 2
Graph([process, process]) # Raises GraphConfigError
# GraphConfigError: Duplicate node name: 'process'
@node ( output_name = "result" )
def process ( missing : int ) -> int :
return missing * 2
Graph([process]) # Valid - 'missing' becomes a required input
print (graph.inputs.required) # ('missing',)
Unproduced inputs become graph inputs - this enables graph composition and entrypoints.
Conflicting Outputs
Multiple nodes can produce the same output only if mutually exclusive (behind gates):
@node ( output_name = "result" )
def path_a ( x : int ) -> int :
return x
@node ( output_name = "result" )
def path_b ( x : int ) -> int :
return x * 2
Graph([path_a, path_b]) # Raises GraphConfigError
# GraphConfigError: Multiple nodes produce output 'result': ['path_a', 'path_b']
# -> Outputs must be unique unless nodes are mutually exclusive (behind gates)
Use @route or @ifelse to create mutually exclusive branches.
Visualization
Interactive HTML Visualization
graph.visualize()
# Opens interactive React Flow visualization in notebook
graph.visualize(
depth = 1 , # Expand nested graphs
theme = "dark" , # "light", "dark", or "auto"
show_types = True , # Show type annotations
filepath = "graph.html" # Save to file instead
)
Mermaid Diagrams
diagram = graph.to_mermaid()
print (diagram) # Renders in notebook, prints raw source
# Customize
diagram = graph.to_mermaid(
direction = "LR" , # Left-to-right layout
show_types = True , # Show type annotations
separate_outputs = True # Render outputs as separate nodes
)
Debug Visualization Issues
debugger = graph.debug_viz()
info = debugger.trace_node( "my_node" )
print ( f "Incoming: { info.incoming_edges } " )
print ( f "Outgoing: { info.outgoing_edges } " )
issues = debugger.find_issues()
for issue in issues:
print (issue)
Advanced Features
NetworkX Integration
Access the underlying graph structure:
import networkx as nx
# NetworkX DiGraph with node/edge attributes
G = graph.nx_graph
# Analyze structure
print (nx.is_directed_acyclic_graph(G)) # Check for cycles
print ( list (nx.topological_sort(G))) # Execution order (DAGs only)
# Flattened graph (includes nested nodes)
flat = graph.to_flat_graph()
for node_id, attrs in flat.nodes( data = True ):
print ( f " { node_id } : parent= { attrs[ 'parent' ] } " )
Producer Mapping
# Map output → producing nodes (handles duplicate outputs)
print (graph.self_producers)
# {'result': {'add'}, 'final': {'double'}}
# Outputs with single producer
print (graph.sole_producers)
# {'result': 'add', 'final': 'double'}
Controlled Execution
# Which nodes are controlled by gates?
print (graph.controlled_by)
# {'process_a': ['router'], 'process_b': ['router']}
Common Patterns
Conditional Pipeline
from hypergraph import route, END
@node ( output_name = "is_valid" )
def validate ( data : dict ) -> bool :
return "id" in data
@route ( targets = [ "process" , END ])
def check ( is_valid : bool ) -> str :
return "process" if is_valid else END
@node ( output_name = "result" )
def process ( data : dict ) -> dict :
return { "processed" : True }
graph = Graph([validate, check, process])
Multi-Stage Pipeline
# Stage 1: Fetch
@node ( output_name = "raw_data" )
def fetch ( url : str ) -> str :
return "data"
# Stage 2: Process
@node ( output_name = "processed" )
def process ( raw_data : str ) -> dict :
return { "result" : raw_data}
# Stage 3: Store
@node ( output_name = "saved" )
def store ( processed : dict ) -> bool :
return True
pipeline = Graph([fetch, process, store])
RAG Pipeline
@node ( output_name = "embedding" )
def embed ( query : str ) -> list[ float ]:
return [ 0.1 , 0.2 , 0.3 ]
@node ( output_name = "docs" )
def retrieve ( embedding : list[ float ]) -> list[ str ]:
return [ "doc1" , "doc2" ]
@node ( output_name = "answer" )
def generate ( query : str , docs : list[ str ]) -> str :
return f "Answer based on { len (docs) } docs"
rag = Graph([embed, retrieve, generate])
Notice how generate receives both query (from graph input) and docs (from retrieve) automatically!
Next Steps
Runners Execute graphs with SyncRunner and AsyncRunner
Execution Modes Understand sync, async, and generator execution
Routing Control flow with @route and @ifelse