Overview
Hypergraph supports four execution modes, auto-detected from function signatures:
Synchronous Function - Regular Python functions
Asynchronous Function - async def functions
Synchronous Generator - Functions that yield values
Asynchronous Generator - async def functions that yield values
The mode is detected automatically - you just write the function, and Hypergraph handles execution.
Execution Mode Detection
Hypergraph inspects the function signature at node creation:
from hypergraph import node
import inspect
@node ( output_name = "result" )
def sync_func ( x : int ) -> int :
return x * 2
print (sync_func.is_async) # False
print (sync_func.is_generator) # False
print (inspect.iscoroutinefunction(sync_func.func)) # False
@node ( output_name = "result" )
async def async_func ( x : int ) -> int :
return x * 2
print (async_func.is_async) # True
print (async_func.is_generator) # False
print (inspect.iscoroutinefunction(async_func.func)) # True
Detection happens at node creation time, not execution time.
Mode 1: Synchronous Functions
The default mode - regular Python functions:
from hypergraph import node, Graph, SyncRunner
@node ( output_name = "doubled" )
def double ( x : int ) -> int :
return x * 2
@node ( output_name = "result" )
def add_one ( doubled : int ) -> int :
return doubled + 1
graph = Graph([double, add_one])
runner = SyncRunner()
result = runner.run(graph, { "x" : 5 })
print (result[ "result" ]) # 11
Characteristics
Executes synchronously (blocks until complete)
Returns a single value
Compatible with both SyncRunner and AsyncRunner
No special async syntax required
When to Use
CPU-bound computations
Simple transformations
Synchronous I/O operations
Default choice for most nodes
Mode 2: Asynchronous Functions
Functions defined with async def:
import asyncio
import httpx
from hypergraph import node, Graph, AsyncRunner
@node ( output_name = "response" )
async def call_api ( url : str ) -> dict :
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.json()
@node ( output_name = "processed" )
async def process ( response : dict ) -> dict :
await asyncio.sleep( 0.1 ) # Simulate async work
return { "processed" : True , ** response}
graph = Graph([call_api, process])
runner = AsyncRunner()
result = await runner.run(graph, { "url" : "https://api.example.com/data" })
print (result[ "processed" ])
Characteristics
Must be executed with AsyncRunner
Returns a single value (via coroutine)
Can use await inside the function
Enables concurrent execution with other async nodes
Async nodes cannot be executed with SyncRunner. Attempting to do so raises IncompatibleRunnerError.
Concurrency
Independent async nodes run concurrently within each superstep:
import asyncio
@node ( output_name = "result_a" )
async def task_a ( x : int ) -> int :
await asyncio.sleep( 1 ) # 1 second
return x * 2
@node ( output_name = "result_b" )
async def task_b ( y : int ) -> int :
await asyncio.sleep( 1 ) # 1 second
return y * 3
# No data dependency between task_a and task_b
graph = Graph([task_a, task_b])
runner = AsyncRunner()
import time
start = time.time()
result = await runner.run(graph, { "x" : 5 , "y" : 10 })
print ( f "Took { time.time() - start :.1f} s" ) # ~1s (concurrent), not 2s!
AsyncRunner executes independent nodes concurrently, dramatically reducing total execution time for I/O-bound workflows.
When to Use
API calls (OpenAI, external services)
Database queries
File I/O with async libraries
Any I/O-bound operation
Workflows that benefit from concurrency
Mode 3: Synchronous Generators
Functions that yield values:
from typing import Iterator
from hypergraph import node, Graph, SyncRunner
@node ( output_name = "chunks" )
def chunk_text ( text : str , size : int = 100 ) -> Iterator[ str ]:
"""Split text into chunks."""
for i in range ( 0 , len (text), size):
yield text[i:i + size]
@node ( output_name = "processed_chunks" )
def process_chunks ( chunks : Iterator[ str ]) -> list[ str ]:
"""Process each chunk."""
return [chunk.upper() for chunk in chunks]
graph = Graph([chunk_text, process_chunks])
runner = SyncRunner()
result = runner.run(graph, { "text" : "a" * 350 , "size" : 100 })
print ( len (result[ "processed_chunks" ])) # 4 chunks
Characteristics
Returns an iterator, not a single value
Values are produced lazily (one at a time)
Downstream nodes receive the iterator
Memory efficient for large datasets
Generator Output
The generator is passed as-is to downstream nodes:
@node ( output_name = "numbers" )
def generate_numbers ( n : int ) -> Iterator[ int ]:
for i in range (n):
yield i
@node ( output_name = "sum" )
def sum_numbers ( numbers : Iterator[ int ]) -> int :
# numbers is the generator object
return sum (numbers) # Consumes the generator
Generators are consumed during iteration - they can only be iterated once.
When to Use
Processing large files line-by-line
Streaming data transformations
Memory-efficient batch processing
Producing intermediate results incrementally
Mode 4: Asynchronous Generators
Async functions that yield values:
from typing import AsyncIterator
import asyncio
from hypergraph import node, Graph, AsyncRunner
@node ( output_name = "tokens" )
async def stream_llm ( prompt : str ) -> AsyncIterator[ str ]:
"""Stream LLM response tokens."""
# Simulate streaming API
words = [ "Hello" , "world" , "from" , "LLM" ]
for word in words:
await asyncio.sleep( 0.1 ) # Simulate network delay
yield word
@node ( output_name = "response" )
async def collect_tokens ( tokens : AsyncIterator[ str ]) -> str :
"""Collect all tokens into a response."""
parts = []
async for token in tokens:
parts.append(token)
return " " .join(parts)
graph = Graph([stream_llm, collect_tokens])
runner = AsyncRunner()
result = await runner.run(graph, { "prompt" : "Say hello" })
print (result[ "response" ]) # "Hello world from LLM"
Characteristics
Must be executed with AsyncRunner
Returns an async iterator
Can use await inside the function
Values are produced asynchronously
Real-World Example: OpenAI Streaming
import openai
from typing import AsyncIterator
@node ( output_name = "tokens" )
async def stream_openai ( prompt : str ) -> AsyncIterator[ str ]:
"""Stream OpenAI completions."""
client = openai.AsyncOpenAI()
stream = await client.chat.completions.create(
model = "gpt-4" ,
messages = [{ "role" : "user" , "content" : prompt}],
stream = True ,
)
async for chunk in stream:
if chunk.choices[ 0 ].delta.content:
yield chunk.choices[ 0 ].delta.content
@node ( output_name = "full_response" )
async def collect_response ( tokens : AsyncIterator[ str ]) -> str :
"""Collect streaming tokens into full response."""
parts = []
async for token in tokens:
parts.append(token)
print (token, end = "" , flush = True ) # Stream to console
return "" .join(parts)
Async generators are perfect for streaming API responses (OpenAI, Anthropic) where you want to display tokens as they arrive.
When to Use
Streaming LLM responses (OpenAI, Anthropic)
Async file reading/processing
Real-time data pipelines
Websocket message streams
Database cursor iteration
Mixing Execution Modes
You can mix all four modes in a single graph:
from typing import Iterator, AsyncIterator
import asyncio
@node ( output_name = "data" )
def load_data ( path : str ) -> dict :
# Mode 1: Sync function
return { "items" : [ 1 , 2 , 3 , 4 , 5 ]}
@node ( output_name = "chunks" )
def chunk_items ( data : dict ) -> Iterator[list[ int ]]:
# Mode 3: Sync generator
items = data[ "items" ]
for i in range ( 0 , len (items), 2 ):
yield items[i:i + 2 ]
@node ( output_name = "processed" )
async def process_chunk ( chunks : Iterator[list[ int ]]) -> list[ int ]:
# Mode 2: Async function consuming generator
results = []
for chunk in chunks:
await asyncio.sleep( 0.1 ) # Simulate async work
results.extend([x * 2 for x in chunk])
return results
graph = Graph([load_data, chunk_items, process_chunk])
runner = AsyncRunner() # Required for async nodes
result = await runner.run(graph, { "path" : "data.json" })
print (result[ "processed" ]) # [2, 4, 6, 8, 10]
If ANY node in the graph is async, you must use AsyncRunner.
Runner Compatibility
Execution Mode SyncRunner AsyncRunner Sync function ✅ Yes ✅ Yes Async function ❌ No ✅ Yes Sync generator ✅ Yes ✅ Yes Async generator ❌ No ✅ Yes
Sequential vs Concurrent Execution
Sequential (SyncRunner)
Concurrent (AsyncRunner)
@node ( output_name = "result_a" )
def task_a ( x : int ) -> int :
time.sleep( 1 ) # 1 second
return x * 2
@node ( output_name = "result_b" )
def task_b ( y : int ) -> int :
time.sleep( 1 ) # 1 second
return y * 3
graph = Graph([task_a, task_b])
runner = SyncRunner()
start = time.time()
result = runner.run(graph, { "x" : 5 , "y" : 10 })
print ( f " { time.time() - start :.1f} s" ) # ~2s (sequential)
Superstep Execution
Both runners execute in supersteps:
Identify ready nodes - nodes with all dependencies satisfied
Execute all ready nodes - sequentially (Sync) or concurrently (Async)
Update state - mark nodes as complete, values as produced
Repeat - until no more ready nodes
Example: 3-Stage Pipeline
@node ( output_name = "a" )
def step1 () -> int :
return 1
@node ( output_name = "b" )
def step2 ( a : int ) -> int :
return a * 2
@node ( output_name = "c" )
def step3 ( b : int ) -> int :
return b * 3
graph = Graph([step1, step2, step3])
Execution:
Superstep 1 : Execute step1 (no dependencies)
Superstep 2 : Execute step2 (depends on a)
Superstep 3 : Execute step3 (depends on b)
Total: 3 supersteps, sequential
@node ( output_name = "data" )
def fetch () -> dict :
return { "value" : 10 }
@node ( output_name = "result_a" )
async def process_a ( data : dict ) -> int :
await asyncio.sleep( 1 )
return data[ "value" ] * 2
@node ( output_name = "result_b" )
async def process_b ( data : dict ) -> int :
await asyncio.sleep( 1 )
return data[ "value" ] * 3
@node ( output_name = "final" )
def combine ( result_a : int , result_b : int ) -> int :
return result_a + result_b
graph = Graph([fetch, process_a, process_b, combine])
Execution:
Superstep 1 : Execute fetch
Superstep 2 : Execute process_a and process_b concurrently (both depend only on data)
Superstep 3 : Execute combine (depends on both results)
Total: 3 supersteps, 2nd step is concurrent
Best Practices
Choose the Right Mode
CPU-bound work
Use sync functions - no benefit from async: @node ( output_name = "result" )
def compute_hash ( data : bytes ) -> str :
return hashlib.sha256(data).hexdigest()
I/O-bound work
Use async functions for concurrency: @node ( output_name = "data" )
async def fetch_api ( url : str ) -> dict :
async with httpx.AsyncClient() as client:
return ( await client.get(url)).json()
Large datasets
Use generators for memory efficiency: @node ( output_name = "lines" )
def read_large_file ( path : str ) -> Iterator[ str ]:
with open (path) as f:
for line in f:
yield line.strip()
Streaming responses
Use async generators for real-time output: @node ( output_name = "tokens" )
async def stream_llm ( prompt : str ) -> AsyncIterator[ str ]:
async for token in openai_stream(prompt):
yield token
Avoid Common Pitfalls
# Bad: blocks event loop
@node ( output_name = "result" )
async def bad ( x : int ) -> int :
time.sleep( 1 ) # Blocks!
return x * 2
# Good: use async sleep
@node ( output_name = "result" )
async def good ( x : int ) -> int :
await asyncio.sleep( 1 )
return x * 2
@node ( output_name = "numbers" )
def generate () -> Iterator[ int ]:
for i in range ( 5 ):
yield i
@node ( output_name = "result" )
def use_twice ( numbers : Iterator[ int ]) -> tuple[ int , int ]:
sum1 = sum (numbers) # Consumes generator
sum2 = sum (numbers) # Empty! Generator exhausted
return (sum1, sum2) # (10, 0) - unexpected!
# Fix: convert to list if needed multiple times
@node ( output_name = "result" )
def use_twice_fixed ( numbers : Iterator[ int ]) -> tuple[ int , int ]:
nums = list (numbers) # Materialize once
return ( sum (nums), sum (nums)) # (10, 10) - correct!
Next Steps
Runners Learn more about SyncRunner and AsyncRunner
Patterns Common patterns for real-world workflows
API Reference Complete API documentation