Skip to main content

Overview

This guide walks through creating a production-grade ETL pipeline from scratch. We’ll build a pipeline for ANATEL (Brazilian telecom regulator) as an example.

Prerequisites

  • Python 3.12+
  • Neo4j 5.x running locally
  • Basic understanding of ETL concepts
  • Familiarity with Pandas for data processing

Pipeline Template

All pipelines inherit from Pipeline base class:
from bracc_etl.base import Pipeline
from bracc_etl.loader import Neo4jBatchLoader
from bracc_etl.transforms import (
    format_cnpj,
    normalize_name,
    parse_date,
    deduplicate_rows,
)

class AnatelPipeline(Pipeline):
    """ETL pipeline for ANATEL telecom licenses."""
    
    name = "anatel"
    source_id = "anatel"
    
    def __init__(self, driver, data_dir="./data", **kwargs):
        super().__init__(driver, data_dir, **kwargs)
        self.licenses = []
        self.companies = []
        self.relationships = []
    
    def extract(self) -> None:
        """Download raw data from ANATEL API."""
        pass
    
    def transform(self) -> None:
        """Normalize and deduplicate data."""
        pass
    
    def load(self) -> None:
        """Load data into Neo4j."""
        pass

Step 1: Create Download Script

Create etl/scripts/download_anatel.py:
#!/usr/bin/env python3
"""Download ANATEL telecom license data.

Usage:
    python etl/scripts/download_anatel.py --output-dir ./data/anatel
"""

import logging
import sys
from pathlib import Path

import click
import httpx

sys.path.insert(0, str(Path(__file__).parent))
from _download_utils import download_file

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

ANATEL_API = "https://sistemas.anatel.gov.br/stel/api/licencas"


@click.command()
@click.option("--output-dir", default="./data/anatel", help="Output directory")
@click.option("--timeout", type=int, default=600, help="Download timeout")
def main(output_dir: str, timeout: int) -> None:
    """Download ANATEL license data."""
    out = Path(output_dir)
    out.mkdir(parents=True, exist_ok=True)
    
    # Download license registry
    dest = out / "licencas.json"
    logger.info("Downloading from %s", ANATEL_API)
    
    try:
        with httpx.stream("GET", ANATEL_API, timeout=timeout) as response:
            response.raise_for_status()
            with open(dest, "wb") as f:
                for chunk in response.iter_bytes(chunk_size=8192):
                    f.write(chunk)
        logger.info("Downloaded: %s", dest)
    except httpx.HTTPError as e:
        logger.error("Download failed: %s", e)
        sys.exit(1)


if __name__ == "__main__":
    main()
Download utilities (etl/scripts/_download_utils.py):
  • download_file(url, dest, timeout) — HTTP download with progress
  • extract_zip(zip_path, extract_dir) — Safe ZIP extraction
  • validate_csv(path, expected_cols) — Column validation

Step 2: Implement Extract Phase

In etl/src/bracc_etl/pipelines/anatel.py:
import json
import logging
from pathlib import Path
import pandas as pd

logger = logging.getLogger(__name__)

class AnatelPipeline(Pipeline):
    # ... (name, source_id, __init__)
    
    def extract(self) -> None:
        """Download and read ANATEL license data."""
        anatel_dir = Path(self.data_dir) / "anatel"
        anatel_dir.mkdir(parents=True, exist_ok=True)
        
        # Option 1: Download via script
        # Run: python etl/scripts/download_anatel.py
        
        # Option 2: Download within pipeline
        import httpx
        url = "https://sistemas.anatel.gov.br/stel/api/licencas"
        dest = anatel_dir / "licencas.json"
        
        if not dest.exists():
            logger.info("Downloading from %s", url)
            try:
                response = httpx.get(url, timeout=600)
                response.raise_for_status()
                dest.write_bytes(response.content)
                logger.info("Downloaded: %s", dest)
            except httpx.HTTPError as e:
                logger.error("Download failed: %s", e)
                raise
        
        # Read JSON data
        with open(dest, "r", encoding="utf-8") as f:
            data = json.load(f)
        
        # Convert to DataFrame
        self._raw_licenses = pd.DataFrame(data["licencas"])
        logger.info("Extracted %d licenses", len(self._raw_licenses))
Key patterns:
  • Create data directory structure
  • Check if file exists (skip re-download)
  • Handle HTTP errors gracefully
  • Log progress

Step 3: Implement Transform Phase

from bracc_etl.transforms import (
    format_cnpj,
    normalize_name,
    parse_date,
    deduplicate_rows,
)

class AnatelPipeline(Pipeline):
    # ... (extract)
    
    def transform(self) -> None:
        """Normalize and deduplicate ANATEL data."""
        df = self._raw_licenses.copy()
        
        # 1. Normalize company data
        df["cnpj"] = df["cnpj"].astype(str).map(format_cnpj)
        df["razao_social"] = df["razao_social"].astype(str).map(normalize_name)
        
        # 2. Parse dates
        df["data_emissao"] = df["data_emissao"].astype(str).map(parse_date)
        df["data_validade"] = df["data_validade"].astype(str).map(parse_date)
        
        # 3. Build company nodes
        company_cols = ["cnpj", "razao_social"]
        self.companies = deduplicate_rows(
            df[company_cols].to_dict("records"),
            key_fields=["cnpj"],
        )
        
        # 4. Build license nodes
        license_cols = [
            "numero_licenca", "tipo", "servico", "tecnologia",
            "data_emissao", "data_validade", "status",
        ]
        self.licenses = df[license_cols].to_dict("records")
        
        # 5. Build relationships (Company -[:POSSUI_LICENCA]-> License)
        self.relationships = [
            {
                "source_key": row["cnpj"],
                "target_key": row["numero_licenca"],
                "data_emissao": row["data_emissao"],
            }
            for _, row in df.iterrows()
        ]
        
        logger.info(
            "Transformed: %d companies, %d licenses, %d relationships",
            len(self.companies),
            len(self.licenses),
            len(self.relationships),
        )
Transform best practices:
  1. Copy DataFrame: df = self._raw.copy() to avoid mutations
  2. Vectorized operations: Use .map() instead of .apply() for speed
  3. Deduplicate: Always deduplicate nodes on primary keys
  4. Type safety: Use .astype(str) before transforms

Step 4: Implement Load Phase

from bracc_etl.loader import Neo4jBatchLoader

class AnatelPipeline(Pipeline):
    # ... (extract, transform)
    
    def load(self) -> None:
        """Load data into Neo4j."""
        loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size)
        
        # 1. Load Company nodes
        if self.companies:
            loader.load_nodes(
                label="Company",
                rows=self.companies,
                key_field="cnpj",
            )
            logger.info("Loaded %d Company nodes", len(self.companies))
        
        # 2. Load License nodes
        if self.licenses:
            loader.load_nodes(
                label="License",
                rows=self.licenses,
                key_field="numero_licenca",
            )
            logger.info("Loaded %d License nodes", len(self.licenses))
        
        # 3. Load relationships
        if self.relationships:
            loader.load_relationships(
                rel_type="POSSUI_LICENCA",
                rows=self.relationships,
                source_label="Company",
                source_key="cnpj",
                target_label="License",
                target_key="numero_licenca",
                properties=["data_emissao"],
            )
            logger.info("Loaded %d POSSUI_LICENCA relationships", len(self.relationships))
        
        # Track metrics
        self.rows_in = len(self._raw_licenses)
        self.rows_loaded = len(self.companies) + len(self.licenses)
Load patterns:
  • Use Neo4jBatchLoader for efficient bulk inserts
  • Always specify key_field for MERGE semantics
  • Load nodes before relationships (foreign key integrity)
  • Track rows_in and rows_loaded for monitoring

Step 5: Register Pipeline

Add to etl/src/bracc_etl/runner.py:54:
from bracc_etl.pipelines.anatel import AnatelPipeline

PIPELINES: dict[str, type] = {
    # ... existing pipelines
    "anatel": AnatelPipeline,
}

Step 6: Test the Pipeline

Run with Limit

bracc-etl run \
  --source anatel \
  --neo4j-password your-password \
  --data-dir ./data \
  --limit 1000
This processes only 1,000 licenses for quick testing.

Verify Results

// Count nodes
MATCH (l:License)
RETURN count(l) AS total_licenses

// Sample licenses
MATCH (c:Company)-[r:POSSUI_LICENCA]->(l:License)
RETURN c.razao_social, l.tipo, l.servico, r.data_emissao
LIMIT 10

Check IngestionRun

MATCH (r:IngestionRun {source_id: 'anatel'})
RETURN r.status, r.rows_in, r.rows_loaded, r.started_at
ORDER BY r.started_at DESC
LIMIT 1
Expected: status = 'loaded'

Advanced Patterns

Pattern 1: Streaming for Large Datasets

For datasets > 10GB, implement run_streaming():
class AnatelPipeline(Pipeline):
    # ... (extract, transform, load)
    
    def run_streaming(self, start_phase: int = 1) -> None:
        """Stream-process large datasets chunk-by-chunk."""
        loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size)
        anatel_dir = Path(self.data_dir) / "anatel"
        
        # Phase 1: Stream licenses from JSONL file
        file_path = anatel_dir / "licencas.jsonl"
        total_companies = 0
        total_licenses = 0
        
        with open(file_path, "r") as f:
            chunk = []
            for line in f:
                chunk.append(json.loads(line))
                
                if len(chunk) >= self.chunk_size:
                    # Transform chunk
                    df = pd.DataFrame(chunk)
                    companies, licenses, rels = self._transform_chunk(df)
                    
                    # Load chunk
                    loader.load_nodes("Company", companies, key_field="cnpj")
                    loader.load_nodes("License", licenses, key_field="numero_licenca")
                    loader.load_relationships(
                        "POSSUI_LICENCA", rels,
                        "Company", "cnpj",
                        "License", "numero_licenca",
                    )
                    
                    total_companies += len(companies)
                    total_licenses += len(licenses)
                    chunk = []
                    
                    logger.info(
                        "Progress: %d companies, %d licenses",
                        total_companies, total_licenses,
                    )
        
        logger.info("Streaming complete: %d companies, %d licenses", 
                    total_companies, total_licenses)
Run:
bracc-etl run --source anatel --streaming --neo4j-password pass

Pattern 2: Multi-File Processing

For sources with multiple files (like CNPJ):
def extract(self) -> None:
    """Extract from multiple CSV files."""
    anatel_dir = Path(self.data_dir) / "anatel"
    
    # Find all CSV files matching pattern
    files = sorted(anatel_dir.glob("licencas_*.csv"))
    if not files:
        raise FileNotFoundError("No license files found")
    
    frames = []
    for f in files:
        logger.info("Reading %s", f.name)
        df = pd.read_csv(f, dtype=str, keep_default_na=False)
        frames.append(df)
    
    self._raw_licenses = pd.concat(frames, ignore_index=True)
    logger.info("Extracted %d licenses from %d files", 
                len(self._raw_licenses), len(files))

Pattern 3: API Pagination

For paginated APIs:
import httpx

def extract(self) -> None:
    """Extract from paginated API."""
    base_url = "https://api.anatel.gov.br/v1/licencas"
    all_records = []
    page = 1
    
    while True:
        logger.info("Fetching page %d", page)
        response = httpx.get(
            base_url,
            params={"page": page, "per_page": 1000},
            timeout=60,
        )
        response.raise_for_status()
        data = response.json()
        
        records = data.get("results", [])
        if not records:
            break
        
        all_records.extend(records)
        page += 1
        
        # Rate limiting
        time.sleep(1)
    
    self._raw_licenses = pd.DataFrame(all_records)
    logger.info("Extracted %d licenses from API", len(self._raw_licenses))

Pattern 4: Schema Validation

Add Pandera schema for validation:
import pandera as pa

class AnatelLicenseSchema(pa.DataFrameModel):
    numero_licenca: str = pa.Field(str_length={"min_value": 1})
    cnpj: str = pa.Field(str_matches=r"^\d{14}$")
    tipo: str = pa.Field(isin=["SCM", "SMP", "STFC", "SeAC"])
    data_emissao: str = pa.Field(str_matches=r"^\d{4}-\d{2}-\d{2}$")

def transform(self) -> None:
    """Transform with validation."""
    df = self._raw_licenses.copy()
    
    # Normalize
    df["cnpj"] = df["cnpj"].map(format_cnpj)
    df["data_emissao"] = df["data_emissao"].map(parse_date)
    
    # Validate schema
    try:
        AnatelLicenseSchema.validate(df, lazy=True)
    except pa.errors.SchemaErrors as e:
        logger.error("Schema validation failed:\n%s", e)
        raise
    
    # Continue transformation...

Pattern 5: Reference Table Resolution

For coded values that need human-readable labels:
def _load_reference_tables(self) -> None:
    """Load ANATEL reference tables."""
    ref_dir = Path(self.data_dir) / "anatel" / "reference"
    
    # Load service type codes
    df = pd.read_csv(ref_dir / "tipos_servico.csv", dtype=str)
    self._service_types = dict(zip(df["codigo"], df["descricao"]))
    logger.info("Loaded %d service type codes", len(self._service_types))

def _resolve_service_type(self, code: str) -> str:
    """Look up service type description."""
    return self._service_types.get(code.strip(), code) if code else code

def transform(self) -> None:
    """Transform with reference resolution."""
    self._load_reference_tables()
    
    df = self._raw_licenses.copy()
    df["servico"] = df["tipo_servico"].map(self._resolve_service_type)
    # ...

Testing

Unit Tests

Create etl/tests/test_anatel.py:
import pandas as pd
import pytest
from bracc_etl.pipelines.anatel import AnatelPipeline


@pytest.fixture
def sample_data():
    return pd.DataFrame({
        "numero_licenca": ["123456789", "987654321"],
        "cnpj": ["12345678000190", "98765432000110"],
        "razao_social": ["EMPRESA A", "EMPRESA B"],
        "tipo": ["SMP", "SCM"],
        "data_emissao": ["2024-01-15", "2024-02-20"],
    })


def test_transform(sample_data, neo4j_driver):
    """Test transform phase."""
    pipeline = AnatelPipeline(neo4j_driver, data_dir="./test_data")
    pipeline._raw_licenses = sample_data
    pipeline.transform()
    
    assert len(pipeline.companies) == 2
    assert len(pipeline.licenses) == 2
    assert len(pipeline.relationships) == 2
    
    # Check CNPJ formatting
    assert pipeline.companies[0]["cnpj"] == "12.345.678/0001-90"

Integration Tests

Run full pipeline with test data:
# Create test data
mkdir -p ./test_data/anatel
cat > ./test_data/anatel/licencas.json << EOF
{
  "licencas": [
    {"numero_licenca": "123456789", "cnpj": "12345678000190", ...}
  ]
}
EOF

# Run pipeline
bracc-etl run \
  --source anatel \
  --neo4j-password test \
  --data-dir ./test_data

# Verify
bracc-etl sources --status --neo4j-password test

Production Checklist

Before merging your pipeline:
  • Download script works with real data source
  • Extract phase handles missing files gracefully
  • Transform phase normalizes all key fields (CNPJ, names, dates)
  • Load phase uses MERGE (not CREATE) for idempotency
  • Deduplication on all node primary keys
  • Error handling with try/except and logging
  • Progress logging every 10k-100k rows
  • Schema validation with Pandera (optional but recommended)
  • Unit tests for transform logic
  • Integration test with sample data
  • Documentation in source registry (source_registry_br_v1.csv)
  • Registered in runner.py PIPELINES dict

Complete Example

Full working pipeline: etl/src/bracc_etl/pipelines/anatel.py
from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any

import httpx
import pandas as pd

from bracc_etl.base import Pipeline
from bracc_etl.loader import Neo4jBatchLoader
from bracc_etl.transforms import (
    deduplicate_rows,
    format_cnpj,
    normalize_name,
    parse_date,
)

if TYPE_CHECKING:
    from neo4j import Driver

logger = logging.getLogger(__name__)

ANATEL_API = "https://sistemas.anatel.gov.br/stel/api/licencas"


class AnatelPipeline(Pipeline):
    """ETL pipeline for ANATEL telecom licenses."""

    name = "anatel"
    source_id = "anatel"

    def __init__(
        self,
        driver: Driver,
        data_dir: str = "./data",
        limit: int | None = None,
        chunk_size: int = 50_000,
        **kwargs: Any,
    ) -> None:
        super().__init__(driver, data_dir, limit=limit, chunk_size=chunk_size, **kwargs)
        self._raw_licenses = pd.DataFrame()
        self.companies: list[dict[str, Any]] = []
        self.licenses: list[dict[str, Any]] = []
        self.relationships: list[dict[str, Any]] = []

    def extract(self) -> None:
        """Download and read ANATEL license data."""
        anatel_dir = Path(self.data_dir) / "anatel"
        anatel_dir.mkdir(parents=True, exist_ok=True)
        dest = anatel_dir / "licencas.json"

        if not dest.exists():
            logger.info("Downloading from %s", ANATEL_API)
            try:
                response = httpx.get(ANATEL_API, timeout=600)
                response.raise_for_status()
                dest.write_bytes(response.content)
                logger.info("Downloaded: %s", dest)
            except httpx.HTTPError as e:
                logger.error("Download failed: %s", e)
                raise

        with open(dest, "r", encoding="utf-8") as f:
            data = json.load(f)

        self._raw_licenses = pd.DataFrame(data["licencas"])
        if self.limit:
            self._raw_licenses = self._raw_licenses.head(self.limit)
        logger.info("Extracted %d licenses", len(self._raw_licenses))

    def transform(self) -> None:
        """Normalize and deduplicate ANATEL data."""
        df = self._raw_licenses.copy()

        df["cnpj"] = df["cnpj"].astype(str).map(format_cnpj)
        df["razao_social"] = df["razao_social"].astype(str).map(normalize_name)
        df["data_emissao"] = df["data_emissao"].astype(str).map(parse_date)
        df["data_validade"] = df["data_validade"].astype(str).map(parse_date)

        company_cols = ["cnpj", "razao_social"]
        self.companies = deduplicate_rows(
            df[company_cols].to_dict("records"), key_fields=["cnpj"],
        )

        license_cols = [
            "numero_licenca", "tipo", "servico", "tecnologia",
            "data_emissao", "data_validade", "status",
        ]
        self.licenses = df[license_cols].to_dict("records")

        self.relationships = [
            {
                "source_key": row["cnpj"],
                "target_key": row["numero_licenca"],
                "data_emissao": row["data_emissao"],
            }
            for _, row in df.iterrows()
        ]

        logger.info(
            "Transformed: %d companies, %d licenses, %d relationships",
            len(self.companies), len(self.licenses), len(self.relationships),
        )

    def load(self) -> None:
        """Load data into Neo4j."""
        loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size)

        if self.companies:
            loader.load_nodes("Company", self.companies, key_field="cnpj")
            logger.info("Loaded %d Company nodes", len(self.companies))

        if self.licenses:
            loader.load_nodes("License", self.licenses, key_field="numero_licenca")
            logger.info("Loaded %d License nodes", len(self.licenses))

        if self.relationships:
            loader.load_relationships(
                rel_type="POSSUI_LICENCA",
                rows=self.relationships,
                source_label="Company",
                source_key="cnpj",
                target_label="License",
                target_key="numero_licenca",
                properties=["data_emissao"],
            )
            logger.info("Loaded %d POSSUI_LICENCA relationships", len(self.relationships))

        self.rows_in = len(self._raw_licenses)
        self.rows_loaded = len(self.companies) + len(self.licenses)

Next Steps

Running Pipelines

Test your new pipeline locally

Pipeline Architecture

Learn advanced design patterns

Data Sources

Add your pipeline to the source registry

Overview

Back to ETL framework overview

Build docs developers (and LLMs) love