Pipeline Lifecycle
Every pipeline follows a strict three-phase lifecycle:1. Extract Phase
Purpose: Download raw data from external sources Responsibilities:- Resolve data source URLs (handle versioning, mirrors)
- Download files with retry logic and timeout handling
- Verify checksums and file integrity
- Extract archives (
.zip,.gz) - Store raw data in
data/{source}/raw/
etl/scripts/download_cnpj.py:89):
etl/scripts/_download_utils.py):
download_file(url, dest, timeout)— HTTP download with progressextract_zip(zip_path, extract_dir)— Safe extraction with path traversal guardvalidate_csv(path, expected_cols)— Column count validation
2. Transform Phase
Purpose: Normalize, validate, and deduplicate data Responsibilities:- Parse and normalize entity names
- Validate and format documents (CPF/CNPJ)
- Parse dates into ISO format
- Deduplicate on primary keys
- Classify entities (person vs company)
- Build relationships between entities
etl/src/bracc_etl/transforms/):
| Module | Functions | Purpose |
|---|---|---|
name_normalization.py | normalize_name() | Uppercase, strip diacritics, collapse whitespace |
document_formatting.py | format_cpf(), format_cnpj(), classify_document() | Validate check digits, add formatting |
date_formatting.py | parse_date() | Handle DD/MM/YYYY, YYYY-MM-DD, epoch formats |
deduplication.py | deduplicate_rows() | Remove duplicates by key fields |
value_sanitization.py | sanitize_value() | Clean empty strings, strip whitespace |
etl/src/bracc_etl/pipelines/cnpj.py:499):
3. Load Phase
Purpose: Bulk insert into Neo4j with MERGE semantics Responsibilities:- Create or update nodes (MERGE on primary key)
- Create relationships between entities
- Update node properties
- Track row counts for monitoring
etl/src/bracc_etl/loader.py:17):
- Default batch size: 10,000 rows per transaction
- Retry logic for transient errors (deadlocks, timeouts)
- Progress logging every 100,000 rows
4. Post-Load Hooks
Purpose: Entity resolution and cross-source linking Linking tiers (seeetl/src/bracc_etl/linking_hooks.py):
| Tier | Strategy | Performance |
|---|---|---|
community | Exact key matching (CNPJ/CPF) | <1 minute |
full | Splink probabilistic matching | 30-60 minutes |
- Link
Personnodes toCompanynodes via exact CPF match - Resolve
Partnernodes (partial identities) toPersonnodes - Cross-link sanctions across sources (OFAC ↔ EU ↔ UN)
Design Patterns
Pattern 1: Streaming Ingestion
Use case: Datasets that don’t fit in memory (CNPJ: 50M+ records) Implementation (CNPJ pipeline atetl/src/bracc_etl/pipelines/cnpj.py:1068):
- Fixed memory footprint (2GB regardless of dataset size)
- Incremental progress (can resume from phase N)
- Real-time monitoring of row counts
Pattern 2: Multi-Format Support
Use case: Handle both raw government CSVs and BigQuery exports Strategy (CNPJ pipeline atetl/src/bracc_etl/pipelines/cnpj.py:360):
Pattern 3: Reference Table Resolution
Use case: Government CSVs use numeric codes that need human-readable labels Implementation (CNPJ pipeline atetl/src/bracc_etl/pipelines/cnpj.py:247):
- Input:
natureza_juridica = "206-2" - Lookup:
naturezas["206-2"] = "Sociedade Empresária Limitada" - Output:
natureza_juridica = "Sociedade Empresária Limitada"
Pattern 4: History Mode
Use case: Temporal data (track ownership changes over time) Implementation (CNPJ pipeline atetl/src/bracc_etl/pipelines/cnpj.py:773):
SOCIO_DE_SNAPSHOT: Immutable historical records (one per month)SOCIO_DE: Derived “latest” projection for queries
Error Handling
Retry Logic
All pipelines implement exponential backoff for transient errors (seeetl/src/bracc_etl/loader.py:46):
Quality Gates
Schema validation using Pandera (seeetl/src/bracc_etl/schemas/validator.py):
- Pipeline status →
quality_fail - Error logged to
IngestionRun.error - Pipeline raises exception (no partial writes)
Next Steps
Data Sources
Browse 45+ sources with status and tiers
Running Pipelines
Run pipelines locally with Neo4j
Creating Pipelines
Build a new pipeline from scratch
Overview
Back to ETL framework overview