The simplest hypergraph workflow: a linear chain of functions where data flows in one direction.
When to Use
- ETL pipelines (extract → transform → load)
- Single-pass ML inference (preprocess → predict → postprocess)
- Data transformations (clean → enrich → validate → save)
Straightforward data flow — functions execute in a single pass.
Basic Pattern
from hypergraph import Graph, node, SyncRunner
@node(output_name="cleaned")
def clean(raw_data: str) -> str:
"""Remove whitespace and normalize."""
return raw_data.strip().lower()
@node(output_name="features")
def extract_features(cleaned: str) -> dict:
"""Extract features from cleaned text."""
return {
"length": len(cleaned),
"word_count": len(cleaned.split()),
"has_numbers": any(c.isdigit() for c in cleaned),
}
@node(output_name="result")
def classify(features: dict) -> str:
"""Classify based on features."""
if features["word_count"] > 100:
return "long_form"
return "short_form"
# Build the pipeline
pipeline = Graph([clean, extract_features, classify])
# Run it
runner = SyncRunner()
result = runner.run(pipeline, {"raw_data": " Hello World "})
print(result["cleaned"]) # "hello world"
print(result["features"]) # {"length": 11, "word_count": 2, "has_numbers": False}
print(result["result"]) # "short_form"
How Edges Are Inferred
The magic: output names match input parameters.
clean(raw_data) → "cleaned"
↓
extract_features(cleaned) → "features"
↓
classify(features) → "result"
clean produces 'cleaned'
The @node(output_name="cleaned") decorator declares the output
extract_features takes 'cleaned' as parameter
Parameter name matches output → edge created automatically
classify takes 'features' as parameter
Same pattern creates another edge
Consistent naming is all it takes — edges are inferred automatically.
Inspecting the Graph
# What inputs does the pipeline need?
print(pipeline.inputs.required) # ('raw_data',)
# What outputs does it produce?
print(pipeline.outputs) # ('cleaned', 'features', 'result')
# Is it a DAG?
print(pipeline.has_cycles) # False
Nodes can have multiple inputs:
@node(output_name="embedding")
def embed(text: str) -> list[float]:
return embedder.encode(text)
@node(output_name="docs")
def retrieve(embedding: list[float], top_k: int = 5) -> list[str]:
return vector_db.search(embedding, k=top_k)
@node(output_name="answer")
def generate(docs: list[str], query: str) -> str:
context = "\n".join(docs)
return llm.generate(f"Context: {context}\n\nQuestion: {query}")
pipeline = Graph([embed, retrieve, generate])
print(pipeline.inputs.required) # ('text', 'query')
print(pipeline.inputs.optional) # ('top_k',)
top_k has a default value, so it’s optional. text and query are required.
Parallel Branches
Independent nodes run in parallel (with AsyncRunner):
@node(output_name="sentiment")
async def analyze_sentiment(text: str) -> float:
return await sentiment_model.predict(text)
@node(output_name="topics")
async def extract_topics(text: str) -> list[str]:
return await topic_model.predict(text)
@node(output_name="summary")
async def summarize(text: str, sentiment: float, topics: list) -> dict:
return {
"text": text[:100],
"sentiment": sentiment,
"topics": topics,
}
pipeline = Graph([analyze_sentiment, extract_topics, summarize])
# sentiment and topics run in parallel (both depend only on text)
# summarize waits for both to complete
runner = AsyncRunner()
result = await runner.run(pipeline, {"text": "..."}, max_concurrency=10)
Binding Values
Pre-fill some inputs for reuse:
# General pipeline
pipeline = Graph([embed, retrieve, generate])
# Specialized for FAQ queries
faq_pipeline = pipeline.bind(top_k=10)
print(faq_pipeline.inputs.required) # ('text', 'query')
print(faq_pipeline.inputs.bound) # {'top_k': 10}
# Even more specialized
support_pipeline = faq_pipeline.bind(query="How do I reset my password?")
print(support_pipeline.inputs.required) # ('text',)
Type Validation
Catch type errors at build time:
@node(output_name="count")
def count_words(text: str) -> int:
return len(text.split())
@node(output_name="result")
def process(count: str) -> str: # Bug: expects str, but count_words returns int
return count.upper()
# Catch the error immediately
Graph([count_words, process], strict_types=True)
# GraphConfigError: Type mismatch on edge 'count_words' → 'process'
# Output type: int
# Input type: str
Enable strict_types=True during development to catch type mismatches early. Disable in production if you need runtime flexibility.
What’s Next?
When you need conditional logic:
When you need composition: