Skip to main content
A classic ETL pipeline. No LLMs, just pure data processing. Shows hypergraph works for traditional workflows too.

When to Use

  • Data ingestion and processing
  • Feature engineering pipelines
  • Report generation workflows
  • Any batch data transformation

The Pipeline

extract → validate → transform → enrich → load
This is a pure linear DAG with data validation and error handling built in.

Implementation

1
Define the extraction node
2
Extract data from various sources:
3
from hypergraph import Graph, node, SyncRunner
import json
import csv
from datetime import datetime
import httpx

@node(output_name="raw_data")
def extract(source_path: str) -> list[dict]:
    """
    Extract raw data from source.
    Supports JSON, CSV, or API endpoints.
    """
    if source_path.endswith(".json"):
        with open(source_path) as f:
            return json.load(f)

    elif source_path.endswith(".csv"):
        with open(source_path) as f:
            reader = csv.DictReader(f)
            return list(reader)

    elif source_path.startswith("http"):
        response = httpx.get(source_path)
        return response.json()

    raise ValueError(f"Unknown source format: {source_path}")
4
Define the validation node
5
Validate records and separate valid from invalid:
6
@node(output_name=("valid_records", "invalid_records"))
def validate(raw_data: list[dict], required_fields: list[str]) -> tuple[list, list]:
    """
    Validate records, separating valid from invalid.
    Returns (valid_records, invalid_records).
    """
    valid = []
    invalid = []

    for record in raw_data:
        missing = [f for f in required_fields if f not in record or record[f] is None]

        if missing:
            invalid.append({
                "record": record,
                "errors": [f"Missing field: {f}" for f in missing],
            })
        else:
            valid.append(record)

    return valid, invalid
7
Nodes can return multiple outputs using tuples. Both valid_records and invalid_records are available to downstream nodes.
8
Define the transformation node
9
Apply transformations to valid records:
10
@node(output_name="transformed")
def transform(valid_records: list[dict], transformations: dict) -> list[dict]:
    """
    Apply transformations to records.

    Example transformations:
    {
        "email": str.lower,
        "price": lambda x: round(float(x), 2),
        "date": lambda x: datetime.fromisoformat(x).date().isoformat(),
    }
    """
    result = []

    for record in valid_records:
        transformed = record.copy()

        for field, transform_fn in transformations.items():
            if field in transformed:
                try:
                    transformed[field] = transform_fn(transformed[field])
                except Exception as e:
                    transformed[f"{field}_error"] = str(e)

        result.append(transformed)

    return result
11
Define the enrichment node
12
Enrich records with additional data:
13
@node(output_name="enriched")
def enrich(transformed: list[dict], lookup_table: dict) -> list[dict]:
    """
    Enrich records with data from lookup tables.

    Example lookup_table:
    {
        "category_names": {"A": "Electronics", "B": "Clothing"},
        "region_codes": {"US": "United States", "UK": "United Kingdom"},
    }
    """
    result = []

    for record in transformed:
        enriched = record.copy()

        # Add category name
        if "category" in enriched and "category_names" in lookup_table:
            code = enriched["category"]
            enriched["category_name"] = lookup_table["category_names"].get(code, "Unknown")

        # Add region name
        if "region" in enriched and "region_codes" in lookup_table:
            code = enriched["region"]
            enriched["region_name"] = lookup_table["region_codes"].get(code, "Unknown")

        # Add processing timestamp
        enriched["processed_at"] = datetime.utcnow().isoformat()

        result.append(enriched)

    return result
14
Define the loading node
15
Load processed data to destination:
16
@node(output_name="load_result")
def load(enriched: list[dict], destination: str) -> dict:
    """
    Load processed data to destination.
    Supports JSON files or database connections.
    """
    if destination.endswith(".json"):
        with open(destination, "w") as f:
            json.dump(enriched, f, indent=2)

    elif destination.startswith("postgres://"):
        import psycopg2
        conn = psycopg2.connect(destination)
        cursor = conn.cursor()

        for record in enriched:
            # Simplified insert - adjust for your schema
            columns = ", ".join(record.keys())
            placeholders = ", ".join(["%s"] * len(record))
            query = f"INSERT INTO data ({columns}) VALUES ({placeholders})"
            cursor.execute(query, list(record.values()))

        conn.commit()
        conn.close()

    return {
        "records_loaded": len(enriched),
        "destination": destination,
        "timestamp": datetime.utcnow().isoformat(),
    }
17
Compose and run the pipeline
18
etl_pipeline = Graph([
    extract,
    validate,
    transform,
    enrich,
    load,
], name="etl")


def main():
    runner = SyncRunner()

    result = runner.run(etl_pipeline, {
        "source_path": "data/raw_orders.json",
        "required_fields": ["id", "email", "amount"],
        "transformations": {
            "email": str.lower,
            "amount": lambda x: round(float(x), 2),
        },
        "lookup_table": {
            "category_names": {"A": "Electronics", "B": "Clothing"},
        },
        "destination": "data/processed_orders.json",
    })

    print(f"Loaded {result['load_result']['records_loaded']} records")
    print(f"Invalid records: {len(result['invalid_records'])}")

    # Log invalid records for review
    for invalid in result["invalid_records"]:
        print(f"  - {invalid['errors']}")


if __name__ == "__main__":
    main()

Batch Processing

Process multiple files using runner.map():
def process_all_files():
    runner = SyncRunner()

    files = ["orders_jan.json", "orders_feb.json", "orders_mar.json"]

    results = runner.map(
        etl_pipeline,
        {
            "source_path": files,
            "required_fields": ["id", "email", "amount"],
            "transformations": {"email": str.lower},
            "lookup_table": {},
            "destination": [f"processed_{f}" for f in files],
        },
        map_over=["source_path", "destination"],
        map_mode="zip",
    )

    total = sum(r["load_result"]["records_loaded"] for r in results)
    print(f"Total records processed: {total}")
runner.map() fans out over collections. Use map_mode="zip" to pair inputs together (source_path[0] with destination[0], etc.).

Add Error Reporting

Generate detailed error reports:
@node(output_name="error_report")
def generate_error_report(invalid_records: list[dict], report_path: str) -> str:
    """Generate a detailed error report."""

    lines = ["# Data Validation Errors", ""]

    for i, invalid in enumerate(invalid_records, 1):
        lines.append(f"## Record {i}")
        lines.append(f"Errors: {', '.join(invalid['errors'])}")
        lines.append(f"Data: {json.dumps(invalid['record'], indent=2)}")
        lines.append("")

    report = "\n".join(lines)

    with open(report_path, "w") as f:
        f.write(report)

    return report_path


etl_with_reporting = Graph([
    extract,
    validate,
    transform,
    enrich,
    load,
    generate_error_report,
])

Add Conditional Processing

Use @ifelse to branch based on data quality:
from hypergraph import ifelse

@ifelse(output_name="quality_gate")
def check_quality(valid_records: list, invalid_records: list) -> bool:
    """Only proceed if error rate is below threshold."""
    total = len(valid_records) + len(invalid_records)
    error_rate = len(invalid_records) / total if total > 0 else 0
    return error_rate < 0.05  # Less than 5% errors

@node(output_name="alert")
def send_alert(invalid_records: list) -> str:
    """Send alert about data quality issues."""
    message = f"Data quality alert: {len(invalid_records)} invalid records"
    # Send to monitoring system
    print(message)
    return message

quality_aware_pipeline = Graph([
    extract,
    validate,
    check_quality.branch(
        if_true=Graph([transform, enrich, load]),
        if_false=Graph([send_alert]),
    ),
])

Testing

def test_validate():
    raw = [
        {"id": 1, "email": "[email protected]", "amount": 10},
        {"id": 2, "email": None, "amount": 20},  # Invalid
        {"id": 3, "amount": 30},  # Invalid - missing email
    ]

    valid, invalid = validate.func(raw, ["id", "email", "amount"])

    assert len(valid) == 1
    assert len(invalid) == 2
    assert valid[0]["id"] == 1

def test_transform():
    records = [{"email": "[email protected]", "price": "99.999"}]
    transforms = {"email": str.lower, "price": lambda x: round(float(x), 2)}

    result = transform.func(records, transforms)

    assert result[0]["email"] == "[email protected]"
    assert result[0]["price"] == 100.0

Production Patterns

Add Logging

import logging

logger = logging.getLogger(__name__)

@node(output_name="log_entry")
def log_progress(load_result: dict, invalid_records: list) -> dict:
    """Log pipeline execution stats."""
    logger.info(
        "ETL pipeline completed",
        extra={
            "records_loaded": load_result["records_loaded"],
            "invalid_records": len(invalid_records),
            "destination": load_result["destination"],
        },
    )
    return load_result

Add Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

@node(output_name="raw_data")
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
)
def extract_with_retry(source_path: str) -> list[dict]:
    """Extract with automatic retry on failure."""
    if source_path.startswith("http"):
        response = httpx.get(source_path, timeout=30)
        response.raise_for_status()
        return response.json()
    # ... handle other sources

What’s Next?

Batch Processing

Process multiple inputs with runner.map()

RAG Pipeline

Build AI-powered pipelines

Evaluation Harness

Test pipelines at scale

Hierarchical Composition

Nest this pipeline in larger workflows

Build docs developers (and LLMs) love