Skip to main content
The simplest hypergraph workflow: a linear chain of functions where data flows in one direction.

When to Use

  • ETL pipelines (extract → transform → load)
  • Single-pass ML inference (preprocess → predict → postprocess)
  • Data transformations (clean → enrich → validate → save)
Straightforward data flow — functions execute in a single pass.

Basic Pattern

from hypergraph import Graph, node, SyncRunner

@node(output_name="cleaned")
def clean(raw_data: str) -> str:
    """Remove whitespace and normalize."""
    return raw_data.strip().lower()

@node(output_name="features")
def extract_features(cleaned: str) -> dict:
    """Extract features from cleaned text."""
    return {
        "length": len(cleaned),
        "word_count": len(cleaned.split()),
        "has_numbers": any(c.isdigit() for c in cleaned),
    }

@node(output_name="result")
def classify(features: dict) -> str:
    """Classify based on features."""
    if features["word_count"] > 100:
        return "long_form"
    return "short_form"

# Build the pipeline
pipeline = Graph([clean, extract_features, classify])

# Run it
runner = SyncRunner()
result = runner.run(pipeline, {"raw_data": "  Hello World  "})

print(result["cleaned"])   # "hello world"
print(result["features"])  # {"length": 11, "word_count": 2, "has_numbers": False}
print(result["result"])    # "short_form"

How Edges Are Inferred

The magic: output names match input parameters.
clean(raw_data) → "cleaned"

extract_features(cleaned) → "features"

classify(features) → "result"
1

clean produces 'cleaned'

The @node(output_name="cleaned") decorator declares the output
2

extract_features takes 'cleaned' as parameter

Parameter name matches output → edge created automatically
3

classify takes 'features' as parameter

Same pattern creates another edge
Consistent naming is all it takes — edges are inferred automatically.

Inspecting the Graph

# What inputs does the pipeline need?
print(pipeline.inputs.required)  # ('raw_data',)

# What outputs does it produce?
print(pipeline.outputs)  # ('cleaned', 'features', 'result')

# Is it a DAG?
print(pipeline.has_cycles)  # False

Multiple Inputs

Nodes can have multiple inputs:
@node(output_name="embedding")
def embed(text: str) -> list[float]:
    return embedder.encode(text)

@node(output_name="docs")
def retrieve(embedding: list[float], top_k: int = 5) -> list[str]:
    return vector_db.search(embedding, k=top_k)

@node(output_name="answer")
def generate(docs: list[str], query: str) -> str:
    context = "\n".join(docs)
    return llm.generate(f"Context: {context}\n\nQuestion: {query}")

pipeline = Graph([embed, retrieve, generate])
print(pipeline.inputs.required)  # ('text', 'query')
print(pipeline.inputs.optional)  # ('top_k',)
top_k has a default value, so it’s optional. text and query are required.

Parallel Branches

Independent nodes run in parallel (with AsyncRunner):
@node(output_name="sentiment")
async def analyze_sentiment(text: str) -> float:
    return await sentiment_model.predict(text)

@node(output_name="topics")
async def extract_topics(text: str) -> list[str]:
    return await topic_model.predict(text)

@node(output_name="summary")
async def summarize(text: str, sentiment: float, topics: list) -> dict:
    return {
        "text": text[:100],
        "sentiment": sentiment,
        "topics": topics,
    }

pipeline = Graph([analyze_sentiment, extract_topics, summarize])

# sentiment and topics run in parallel (both depend only on text)
# summarize waits for both to complete
runner = AsyncRunner()
result = await runner.run(pipeline, {"text": "..."}, max_concurrency=10)

Binding Values

Pre-fill some inputs for reuse:
# General pipeline
pipeline = Graph([embed, retrieve, generate])

# Specialized for FAQ queries
faq_pipeline = pipeline.bind(top_k=10)
print(faq_pipeline.inputs.required)  # ('text', 'query')
print(faq_pipeline.inputs.bound)     # {'top_k': 10}

# Even more specialized
support_pipeline = faq_pipeline.bind(query="How do I reset my password?")
print(support_pipeline.inputs.required)  # ('text',)

Type Validation

Catch type errors at build time:
@node(output_name="count")
def count_words(text: str) -> int:
    return len(text.split())

@node(output_name="result")
def process(count: str) -> str:  # Bug: expects str, but count_words returns int
    return count.upper()

# Catch the error immediately
Graph([count_words, process], strict_types=True)
# GraphConfigError: Type mismatch on edge 'count_words' → 'process'
#   Output type: int
#   Input type:  str
Enable strict_types=True during development to catch type mismatches early. Disable in production if you need runtime flexibility.

What’s Next?

When you need conditional logic: When you need composition:

Build docs developers (and LLMs) love