Skip to main content

Pipeline Lifecycle

Every pipeline follows a strict three-phase lifecycle:

1. Extract Phase

Purpose: Download raw data from external sources Responsibilities:
  • Resolve data source URLs (handle versioning, mirrors)
  • Download files with retry logic and timeout handling
  • Verify checksums and file integrity
  • Extract archives (.zip, .gz)
  • Store raw data in data/{source}/raw/
Example (CNPJ pipeline at etl/scripts/download_cnpj.py:89):
def resolve_rf_release(year_month: str | None = None) -> str:
    """Resolve the Receita Federal CNPJ release URL.
    
    Strategy:
    1. Try Nextcloud share (primary since Jan 2026)
    2. Fall back to legacy dadosabertos.rfb.gov.br paths
    3. Raise RuntimeError if nothing works (fail-closed)
    """
    # Probe Nextcloud tokens
    for token in tokens_to_try:
        if _check_nextcloud_token(token):
            return NEXTCLOUD_BASE.format(token=token)
    
    # Try legacy URLs with current/previous month
    for ym in candidates:
        url = LEGACY_NEW_BASE_PATTERN.format(year_month=ym)
        if _check_url_accessible(url):
            return url
    
    raise RuntimeError("Could not resolve CNPJ release")
Download utilities (etl/scripts/_download_utils.py):
  • download_file(url, dest, timeout) — HTTP download with progress
  • extract_zip(zip_path, extract_dir) — Safe extraction with path traversal guard
  • validate_csv(path, expected_cols) — Column count validation

2. Transform Phase

Purpose: Normalize, validate, and deduplicate data Responsibilities:
  • Parse and normalize entity names
  • Validate and format documents (CPF/CNPJ)
  • Parse dates into ISO format
  • Deduplicate on primary keys
  • Classify entities (person vs company)
  • Build relationships between entities
Transform modules (etl/src/bracc_etl/transforms/):
ModuleFunctionsPurpose
name_normalization.pynormalize_name()Uppercase, strip diacritics, collapse whitespace
document_formatting.pyformat_cpf(), format_cnpj(), classify_document()Validate check digits, add formatting
date_formatting.pyparse_date()Handle DD/MM/YYYY, YYYY-MM-DD, epoch formats
deduplication.pydeduplicate_rows()Remove duplicates by key fields
value_sanitization.pysanitize_value()Clean empty strings, strip whitespace
Example (CNPJ pipeline at etl/src/bracc_etl/pipelines/cnpj.py:499):
def _transform_empresas_rf(self, df: pd.DataFrame) -> list[dict[str, Any]]:
    """Vectorized transform for RF-format empresas."""
    df = df.copy()
    df["basico"] = df["cnpj_basico"].astype(str).str.zfill(8)
    
    # Lookup full CNPJ from estabelecimentos
    lookup = self._estab_lookup
    df["cnpj"] = df["basico"].map(
        lambda b: lookup[b][0] if b in lookup else format_cnpj(b + "000100"),
    )
    
    # Parse capital social (comma as decimal separator)
    df["capital_social"] = df["capital_social"].astype(str).map(parse_capital_social)
    
    # Normalize company names
    df["razao_social"] = df["razao_social"].astype(str).map(normalize_name)
    
    # Resolve reference table codes
    df["natureza_juridica"] = df["natureza_juridica"].astype(str).map(
        lambda c: self._resolve_reference("naturezas", c),
    )
    
    return df[["cnpj", "razao_social", "natureza_juridica", ...]].to_dict("records")

3. Load Phase

Purpose: Bulk insert into Neo4j with MERGE semantics Responsibilities:
  • Create or update nodes (MERGE on primary key)
  • Create relationships between entities
  • Update node properties
  • Track row counts for monitoring
Loader (etl/src/bracc_etl/loader.py:17):
class Neo4jBatchLoader:
    """Bulk loader using UNWIND for efficient Neo4j writes."""
    
    def load_nodes(
        self,
        label: str,
        rows: list[dict[str, Any]],
        key_field: str,
    ) -> int:
        query = (
            f"UNWIND $rows AS row "
            f"MERGE (n:{label} {{{key_field}: row.{key_field}}}) "
            f"SET n.prop1 = row.prop1, n.prop2 = row.prop2, ..."
        )
        return self._run_batches(query, rows)
    
    def load_relationships(
        self,
        rel_type: str,
        rows: list[dict[str, Any]],
        source_label: str,
        source_key: str,
        target_label: str,
        target_key: str,
        properties: list[str] | None = None,
    ) -> int:
        query = (
            f"UNWIND $rows AS row "
            f"MATCH (a:{source_label} {{{source_key}: row.source_key}}) "
            f"MATCH (b:{target_label} {{{target_key}: row.target_key}}) "
            f"MERGE (a)-[r:{rel_type}]->(b) "
            f"SET r.prop1 = row.prop1, ..."
        )
        return self._run_batches(query, rows)
Batch processing:
  • Default batch size: 10,000 rows per transaction
  • Retry logic for transient errors (deadlocks, timeouts)
  • Progress logging every 100,000 rows

4. Post-Load Hooks

Purpose: Entity resolution and cross-source linking Linking tiers (see etl/src/bracc_etl/linking_hooks.py):
TierStrategyPerformance
communityExact key matching (CNPJ/CPF)<1 minute
fullSplink probabilistic matching30-60 minutes
Example hooks:
  • Link Person nodes to Company nodes via exact CPF match
  • Resolve Partner nodes (partial identities) to Person nodes
  • Cross-link sanctions across sources (OFAC ↔ EU ↔ UN)

Design Patterns

Pattern 1: Streaming Ingestion

Use case: Datasets that don’t fit in memory (CNPJ: 50M+ records) Implementation (CNPJ pipeline at etl/src/bracc_etl/pipelines/cnpj.py:1068):
def run_streaming(self, start_phase: int = 1) -> None:
    """Stream-process data files chunk-by-chunk."""
    loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size)
    
    # Phase 1: Build lookup tables (load all estabelecimentos)
    for f in estabelecimento_files:
        for chunk in pd.read_csv(f, chunksize=50_000):
            self._build_estab_lookup(chunk)
    
    # Phase 2: Stream empresas → transform → load
    for f in empresas_files:
        for chunk in pd.read_csv(f, chunksize=50_000):
            companies = self._transform_empresas_rf(chunk)
            loader.load_nodes("Company", companies, key_field="cnpj")
    
    # Phase 3: Stream socios → transform → load
    for f in socios_files:
        for chunk in pd.read_csv(f, chunksize=50_000):
            partners, relationships = self._transform_socios_rf(chunk)
            loader.load_nodes("Person", partners, key_field="cpf")
            loader.load_relationships("SOCIO_DE", relationships, ...)
Advantages:
  • Fixed memory footprint (2GB regardless of dataset size)
  • Incremental progress (can resume from phase N)
  • Real-time monitoring of row counts
CLI:
bracc-etl run --source cnpj --streaming --start-phase 2

Pattern 2: Multi-Format Support

Use case: Handle both raw government CSVs and BigQuery exports Strategy (CNPJ pipeline at etl/src/bracc_etl/pipelines/cnpj.py:360):
def extract(self) -> None:
    """Try three formats in order:
    1. Real RF format: headerless `;`-delimited CSVs
    2. Base dos Dados (BigQuery) exports: header-based CSVs
    3. Simple CSV: header-based CSVs (dev/test)
    """
    # Try RF format first
    rf_empresas = self._read_rf_chunks("*EMPRE*", EMPRESAS_COLS)
    
    if rf_empresas.empty:
        # Try BigQuery exports (different column names)
        bq_empresas = self._read_bq_csv(
            "empresas_*.csv",
            BQ_EMPRESAS_RENAME,  # Map BQ columns to RF names
            BQ_EMPRESAS_DROP,    # Drop BQ metadata columns
        )
        self._raw_empresas = bq_empresas
    else:
        self._raw_empresas = rf_empresas

Pattern 3: Reference Table Resolution

Use case: Government CSVs use numeric codes that need human-readable labels Implementation (CNPJ pipeline at etl/src/bracc_etl/pipelines/cnpj.py:247):
def _load_reference_tables(self) -> None:
    """Load reference lookup tables (naturezas, qualificacoes, etc.)."""
    ref_dir = Path(self.data_dir) / "cnpj" / "reference"
    for table_name in REFERENCE_TABLES:
        df = pd.read_csv(
            ref_dir / f"{table_name}.csv",
            names=["codigo", "descricao"],
            dtype=str,
        )
        lookup = dict(zip(df["codigo"], df["descricao"]))
        self._reference_tables[table_name.lower()] = lookup

def _resolve_reference(self, table: str, code: str) -> str:
    """Look up a code in a reference table."""
    lookup = self._reference_tables.get(table, {})
    return lookup.get(code.strip(), code) if code else code
Example:
  • Input: natureza_juridica = "206-2"
  • Lookup: naturezas["206-2"] = "Sociedade Empresária Limitada"
  • Output: natureza_juridica = "Sociedade Empresária Limitada"

Pattern 4: History Mode

Use case: Temporal data (track ownership changes over time) Implementation (CNPJ pipeline at etl/src/bracc_etl/pipelines/cnpj.py:773):
def _build_snapshot_relationships(self, pf_rels, partial_rels, pj_rels):
    """Build historical SOCIO_DE_SNAPSHOT relationships."""
    rows = []
    for rel in pf_rels:
        rows.append({
            "source_key": rel["source_key"],
            "target_key": rel["target_key"],
            "snapshot_date": rel["snapshot_date"],  # YYYY-MM-01
            "data_entrada": rel["data_entrada"],
            "membership_id": _make_membership_id(...),
        })
    return rows

def _rebuild_latest_projection_from_snapshots(self):
    """Rebuild factual SOCIO_DE from latest snapshot per pair."""
    session.run("""
        MATCH (a)-[r:SOCIO_DE_SNAPSHOT]->(b:Company)
        WITH a, b, max(r.snapshot_date) AS max_snapshot
        MATCH (a)-[r2:SOCIO_DE_SNAPSHOT]->(b)
        WHERE r2.snapshot_date = max_snapshot
        WITH a, b, collect(r2)[0] AS latest
        MERGE (a)-[s:SOCIO_DE]->(b)
        SET s = latest
    """)
Graph structure:
  • SOCIO_DE_SNAPSHOT: Immutable historical records (one per month)
  • SOCIO_DE: Derived “latest” projection for queries

Error Handling

Retry Logic

All pipelines implement exponential backoff for transient errors (see etl/src/bracc_etl/loader.py:46):
def run_query_with_retry(self, query, rows, batch_size=500):
    for i in range(0, len(rows), batch_size):
        for attempt in range(_MAX_RETRIES):
            try:
                self._run_batch_once(query, batch)
                break
            except TransientError:
                wait = 2 ** attempt  # 1s, 2s, 4s, 8s, 16s
                logger.warning("Deadlock, retry %d/%d in %ds", attempt+1, _MAX_RETRIES, wait)
                time.sleep(wait)
        else:
            logger.error("Failed batch after %d retries, skipping", _MAX_RETRIES)

Quality Gates

Schema validation using Pandera (see etl/src/bracc_etl/schemas/validator.py):
import pandera as pa

class CNPJSchema(pa.DataFrameSchema):
    cnpj: pa.Column[str] = pa.Field(str_matches=r"^\d{14}$")
    razao_social: pa.Column[str] = pa.Field(str_length={"min_value": 1})
    capital_social: pa.Column[float] = pa.Field(ge=0)
If validation fails:
  1. Pipeline status → quality_fail
  2. Error logged to IngestionRun.error
  3. Pipeline raises exception (no partial writes)

Next Steps

Data Sources

Browse 45+ sources with status and tiers

Running Pipelines

Run pipelines locally with Neo4j

Creating Pipelines

Build a new pipeline from scratch

Overview

Back to ETL framework overview

Build docs developers (and LLMs) love