When to Use
- Data ingestion and processing
- Feature engineering pipelines
- Report generation workflows
- Any batch data transformation
The Pipeline
Implementation
from hypergraph import Graph, node, SyncRunner
import json
import csv
from datetime import datetime
import httpx
@node(output_name="raw_data")
def extract(source_path: str) -> list[dict]:
"""
Extract raw data from source.
Supports JSON, CSV, or API endpoints.
"""
if source_path.endswith(".json"):
with open(source_path) as f:
return json.load(f)
elif source_path.endswith(".csv"):
with open(source_path) as f:
reader = csv.DictReader(f)
return list(reader)
elif source_path.startswith("http"):
response = httpx.get(source_path)
return response.json()
raise ValueError(f"Unknown source format: {source_path}")
@node(output_name=("valid_records", "invalid_records"))
def validate(raw_data: list[dict], required_fields: list[str]) -> tuple[list, list]:
"""
Validate records, separating valid from invalid.
Returns (valid_records, invalid_records).
"""
valid = []
invalid = []
for record in raw_data:
missing = [f for f in required_fields if f not in record or record[f] is None]
if missing:
invalid.append({
"record": record,
"errors": [f"Missing field: {f}" for f in missing],
})
else:
valid.append(record)
return valid, invalid
Nodes can return multiple outputs using tuples. Both
valid_records and invalid_records are available to downstream nodes.@node(output_name="transformed")
def transform(valid_records: list[dict], transformations: dict) -> list[dict]:
"""
Apply transformations to records.
Example transformations:
{
"email": str.lower,
"price": lambda x: round(float(x), 2),
"date": lambda x: datetime.fromisoformat(x).date().isoformat(),
}
"""
result = []
for record in valid_records:
transformed = record.copy()
for field, transform_fn in transformations.items():
if field in transformed:
try:
transformed[field] = transform_fn(transformed[field])
except Exception as e:
transformed[f"{field}_error"] = str(e)
result.append(transformed)
return result
@node(output_name="enriched")
def enrich(transformed: list[dict], lookup_table: dict) -> list[dict]:
"""
Enrich records with data from lookup tables.
Example lookup_table:
{
"category_names": {"A": "Electronics", "B": "Clothing"},
"region_codes": {"US": "United States", "UK": "United Kingdom"},
}
"""
result = []
for record in transformed:
enriched = record.copy()
# Add category name
if "category" in enriched and "category_names" in lookup_table:
code = enriched["category"]
enriched["category_name"] = lookup_table["category_names"].get(code, "Unknown")
# Add region name
if "region" in enriched and "region_codes" in lookup_table:
code = enriched["region"]
enriched["region_name"] = lookup_table["region_codes"].get(code, "Unknown")
# Add processing timestamp
enriched["processed_at"] = datetime.utcnow().isoformat()
result.append(enriched)
return result
@node(output_name="load_result")
def load(enriched: list[dict], destination: str) -> dict:
"""
Load processed data to destination.
Supports JSON files or database connections.
"""
if destination.endswith(".json"):
with open(destination, "w") as f:
json.dump(enriched, f, indent=2)
elif destination.startswith("postgres://"):
import psycopg2
conn = psycopg2.connect(destination)
cursor = conn.cursor()
for record in enriched:
# Simplified insert - adjust for your schema
columns = ", ".join(record.keys())
placeholders = ", ".join(["%s"] * len(record))
query = f"INSERT INTO data ({columns}) VALUES ({placeholders})"
cursor.execute(query, list(record.values()))
conn.commit()
conn.close()
return {
"records_loaded": len(enriched),
"destination": destination,
"timestamp": datetime.utcnow().isoformat(),
}
etl_pipeline = Graph([
extract,
validate,
transform,
enrich,
load,
], name="etl")
def main():
runner = SyncRunner()
result = runner.run(etl_pipeline, {
"source_path": "data/raw_orders.json",
"required_fields": ["id", "email", "amount"],
"transformations": {
"email": str.lower,
"amount": lambda x: round(float(x), 2),
},
"lookup_table": {
"category_names": {"A": "Electronics", "B": "Clothing"},
},
"destination": "data/processed_orders.json",
})
print(f"Loaded {result['load_result']['records_loaded']} records")
print(f"Invalid records: {len(result['invalid_records'])}")
# Log invalid records for review
for invalid in result["invalid_records"]:
print(f" - {invalid['errors']}")
if __name__ == "__main__":
main()
Batch Processing
Process multiple files usingrunner.map():
runner.map() fans out over collections. Use map_mode="zip" to pair inputs together (source_path[0] with destination[0], etc.).Add Error Reporting
Generate detailed error reports:Add Conditional Processing
Use@ifelse to branch based on data quality:
Testing
Production Patterns
Add Logging
Add Retry Logic
What’s Next?
Batch Processing
Process multiple inputs with runner.map()
RAG Pipeline
Build AI-powered pipelines
Evaluation Harness
Test pipelines at scale
Hierarchical Composition
Nest this pipeline in larger workflows