Skip to main content

Overview

Hypergraph supports four execution modes, auto-detected from function signatures:
  1. Synchronous Function - Regular Python functions
  2. Asynchronous Function - async def functions
  3. Synchronous Generator - Functions that yield values
  4. Asynchronous Generator - async def functions that yield values
The mode is detected automatically - you just write the function, and Hypergraph handles execution.

Execution Mode Detection

Hypergraph inspects the function signature at node creation:
from hypergraph import node
import inspect

@node(output_name="result")
def sync_func(x: int) -> int:
    return x * 2

print(sync_func.is_async)      # False
print(sync_func.is_generator)  # False
print(inspect.iscoroutinefunction(sync_func.func))  # False

@node(output_name="result")
async def async_func(x: int) -> int:
    return x * 2

print(async_func.is_async)     # True
print(async_func.is_generator) # False
print(inspect.iscoroutinefunction(async_func.func))  # True
Detection happens at node creation time, not execution time.

Mode 1: Synchronous Functions

The default mode - regular Python functions:
from hypergraph import node, Graph, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

@node(output_name="result")
def add_one(doubled: int) -> int:
    return doubled + 1

graph = Graph([double, add_one])
runner = SyncRunner()

result = runner.run(graph, {"x": 5})
print(result["result"])  # 11

Characteristics

  • Executes synchronously (blocks until complete)
  • Returns a single value
  • Compatible with both SyncRunner and AsyncRunner
  • No special async syntax required

When to Use

  • CPU-bound computations
  • Simple transformations
  • Synchronous I/O operations
  • Default choice for most nodes

Mode 2: Asynchronous Functions

Functions defined with async def:
import asyncio
import httpx
from hypergraph import node, Graph, AsyncRunner

@node(output_name="response")
async def call_api(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(url)
        return resp.json()

@node(output_name="processed")
async def process(response: dict) -> dict:
    await asyncio.sleep(0.1)  # Simulate async work
    return {"processed": True, **response}

graph = Graph([call_api, process])
runner = AsyncRunner()

result = await runner.run(graph, {"url": "https://api.example.com/data"})
print(result["processed"])

Characteristics

  • Must be executed with AsyncRunner
  • Returns a single value (via coroutine)
  • Can use await inside the function
  • Enables concurrent execution with other async nodes
Async nodes cannot be executed with SyncRunner. Attempting to do so raises IncompatibleRunnerError.

Concurrency

Independent async nodes run concurrently within each superstep:
import asyncio

@node(output_name="result_a")
async def task_a(x: int) -> int:
    await asyncio.sleep(1)  # 1 second
    return x * 2

@node(output_name="result_b")
async def task_b(y: int) -> int:
    await asyncio.sleep(1)  # 1 second
    return y * 3

# No data dependency between task_a and task_b
graph = Graph([task_a, task_b])
runner = AsyncRunner()

import time
start = time.time()
result = await runner.run(graph, {"x": 5, "y": 10})
print(f"Took {time.time() - start:.1f}s")  # ~1s (concurrent), not 2s!
AsyncRunner executes independent nodes concurrently, dramatically reducing total execution time for I/O-bound workflows.

When to Use

  • API calls (OpenAI, external services)
  • Database queries
  • File I/O with async libraries
  • Any I/O-bound operation
  • Workflows that benefit from concurrency

Mode 3: Synchronous Generators

Functions that yield values:
from typing import Iterator
from hypergraph import node, Graph, SyncRunner

@node(output_name="chunks")
def chunk_text(text: str, size: int = 100) -> Iterator[str]:
    """Split text into chunks."""
    for i in range(0, len(text), size):
        yield text[i:i+size]

@node(output_name="processed_chunks")
def process_chunks(chunks: Iterator[str]) -> list[str]:
    """Process each chunk."""
    return [chunk.upper() for chunk in chunks]

graph = Graph([chunk_text, process_chunks])
runner = SyncRunner()

result = runner.run(graph, {"text": "a" * 350, "size": 100})
print(len(result["processed_chunks"]))  # 4 chunks

Characteristics

  • Returns an iterator, not a single value
  • Values are produced lazily (one at a time)
  • Downstream nodes receive the iterator
  • Memory efficient for large datasets

Generator Output

The generator is passed as-is to downstream nodes:
@node(output_name="numbers")
def generate_numbers(n: int) -> Iterator[int]:
    for i in range(n):
        yield i

@node(output_name="sum")
def sum_numbers(numbers: Iterator[int]) -> int:
    # numbers is the generator object
    return sum(numbers)  # Consumes the generator
Generators are consumed during iteration - they can only be iterated once.

When to Use

  • Processing large files line-by-line
  • Streaming data transformations
  • Memory-efficient batch processing
  • Producing intermediate results incrementally

Mode 4: Asynchronous Generators

Async functions that yield values:
from typing import AsyncIterator
import asyncio
from hypergraph import node, Graph, AsyncRunner

@node(output_name="tokens")
async def stream_llm(prompt: str) -> AsyncIterator[str]:
    """Stream LLM response tokens."""
    # Simulate streaming API
    words = ["Hello", "world", "from", "LLM"]
    for word in words:
        await asyncio.sleep(0.1)  # Simulate network delay
        yield word

@node(output_name="response")
async def collect_tokens(tokens: AsyncIterator[str]) -> str:
    """Collect all tokens into a response."""
    parts = []
    async for token in tokens:
        parts.append(token)
    return " ".join(parts)

graph = Graph([stream_llm, collect_tokens])
runner = AsyncRunner()

result = await runner.run(graph, {"prompt": "Say hello"})
print(result["response"])  # "Hello world from LLM"

Characteristics

  • Must be executed with AsyncRunner
  • Returns an async iterator
  • Can use await inside the function
  • Values are produced asynchronously

Real-World Example: OpenAI Streaming

import openai
from typing import AsyncIterator

@node(output_name="tokens")
async def stream_openai(prompt: str) -> AsyncIterator[str]:
    """Stream OpenAI completions."""
    client = openai.AsyncOpenAI()
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

@node(output_name="full_response")
async def collect_response(tokens: AsyncIterator[str]) -> str:
    """Collect streaming tokens into full response."""
    parts = []
    async for token in tokens:
        parts.append(token)
        print(token, end="", flush=True)  # Stream to console
    return "".join(parts)
Async generators are perfect for streaming API responses (OpenAI, Anthropic) where you want to display tokens as they arrive.

When to Use

  • Streaming LLM responses (OpenAI, Anthropic)
  • Async file reading/processing
  • Real-time data pipelines
  • Websocket message streams
  • Database cursor iteration

Mixing Execution Modes

You can mix all four modes in a single graph:
from typing import Iterator, AsyncIterator
import asyncio

@node(output_name="data")
def load_data(path: str) -> dict:
    # Mode 1: Sync function
    return {"items": [1, 2, 3, 4, 5]}

@node(output_name="chunks")
def chunk_items(data: dict) -> Iterator[list[int]]:
    # Mode 3: Sync generator
    items = data["items"]
    for i in range(0, len(items), 2):
        yield items[i:i+2]

@node(output_name="processed")
async def process_chunk(chunks: Iterator[list[int]]) -> list[int]:
    # Mode 2: Async function consuming generator
    results = []
    for chunk in chunks:
        await asyncio.sleep(0.1)  # Simulate async work
        results.extend([x * 2 for x in chunk])
    return results

graph = Graph([load_data, chunk_items, process_chunk])
runner = AsyncRunner()  # Required for async nodes

result = await runner.run(graph, {"path": "data.json"})
print(result["processed"])  # [2, 4, 6, 8, 10]
If ANY node in the graph is async, you must use AsyncRunner.

Runner Compatibility

Execution ModeSyncRunnerAsyncRunner
Sync function✅ Yes✅ Yes
Async function❌ No✅ Yes
Sync generator✅ Yes✅ Yes
Async generator❌ No✅ Yes

Performance Implications

Sequential vs Concurrent Execution

@node(output_name="result_a")
def task_a(x: int) -> int:
    time.sleep(1)  # 1 second
    return x * 2

@node(output_name="result_b")
def task_b(y: int) -> int:
    time.sleep(1)  # 1 second
    return y * 3

graph = Graph([task_a, task_b])
runner = SyncRunner()

start = time.time()
result = runner.run(graph, {"x": 5, "y": 10})
print(f"{time.time() - start:.1f}s")  # ~2s (sequential)

Superstep Execution

Both runners execute in supersteps:
  1. Identify ready nodes - nodes with all dependencies satisfied
  2. Execute all ready nodes - sequentially (Sync) or concurrently (Async)
  3. Update state - mark nodes as complete, values as produced
  4. Repeat - until no more ready nodes
@node(output_name="a")
def step1() -> int:
    return 1

@node(output_name="b")
def step2(a: int) -> int:
    return a * 2

@node(output_name="c")
def step3(b: int) -> int:
    return b * 3

graph = Graph([step1, step2, step3])
Execution:
  • Superstep 1: Execute step1 (no dependencies)
  • Superstep 2: Execute step2 (depends on a)
  • Superstep 3: Execute step3 (depends on b)
Total: 3 supersteps, sequential
@node(output_name="data")
def fetch() -> dict:
    return {"value": 10}

@node(output_name="result_a")
async def process_a(data: dict) -> int:
    await asyncio.sleep(1)
    return data["value"] * 2

@node(output_name="result_b")
async def process_b(data: dict) -> int:
    await asyncio.sleep(1)
    return data["value"] * 3

@node(output_name="final")
def combine(result_a: int, result_b: int) -> int:
    return result_a + result_b

graph = Graph([fetch, process_a, process_b, combine])
Execution:
  • Superstep 1: Execute fetch
  • Superstep 2: Execute process_a and process_b concurrently (both depend only on data)
  • Superstep 3: Execute combine (depends on both results)
Total: 3 supersteps, 2nd step is concurrent

Best Practices

Choose the Right Mode

1

CPU-bound work

Use sync functions - no benefit from async:
@node(output_name="result")
def compute_hash(data: bytes) -> str:
    return hashlib.sha256(data).hexdigest()
2

I/O-bound work

Use async functions for concurrency:
@node(output_name="data")
async def fetch_api(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        return (await client.get(url)).json()
3

Large datasets

Use generators for memory efficiency:
@node(output_name="lines")
def read_large_file(path: str) -> Iterator[str]:
    with open(path) as f:
        for line in f:
            yield line.strip()
4

Streaming responses

Use async generators for real-time output:
@node(output_name="tokens")
async def stream_llm(prompt: str) -> AsyncIterator[str]:
    async for token in openai_stream(prompt):
        yield token

Avoid Common Pitfalls

# Bad: blocks event loop
@node(output_name="result")
async def bad(x: int) -> int:
    time.sleep(1)  # Blocks!
    return x * 2

# Good: use async sleep
@node(output_name="result")
async def good(x: int) -> int:
    await asyncio.sleep(1)
    return x * 2
@node(output_name="numbers")
def generate() -> Iterator[int]:
    for i in range(5):
        yield i

@node(output_name="result")
def use_twice(numbers: Iterator[int]) -> tuple[int, int]:
    sum1 = sum(numbers)  # Consumes generator
    sum2 = sum(numbers)  # Empty! Generator exhausted
    return (sum1, sum2)  # (10, 0) - unexpected!

# Fix: convert to list if needed multiple times
@node(output_name="result")
def use_twice_fixed(numbers: Iterator[int]) -> tuple[int, int]:
    nums = list(numbers)  # Materialize once
    return (sum(nums), sum(nums))  # (10, 10) - correct!

Next Steps

Runners

Learn more about SyncRunner and AsyncRunner

Patterns

Common patterns for real-world workflows

API Reference

Complete API documentation

Build docs developers (and LLMs) love