Skip to main content

Prerequisites

Before running pipelines, ensure you have:
1

Python 3.12+

Check your Python version:
python --version  # Should be 3.12 or higher
2

Neo4j 5.x

Install Neo4j Desktop or run via Docker:
docker run -d \
  --name neo4j \
  -p 7474:7474 -p 7687:7687 \
  -e NEO4J_AUTH=neo4j/your-password \
  neo4j:5.15
Access Neo4j Browser at http://localhost:7474
3

Install Dependencies

Navigate to the ETL directory and install:
cd ~/workspace/source/etl
pip install -e .

# For BigQuery sources
pip install -e ".[bigquery]"

# For development
pip install -e ".[dev]"

Quick Start

1. List Available Pipelines

bracc-etl sources
Output:
Available pipelines:
  bcb
  bndes
  caged
  camara
  camara_inquiries
  ceaf
  cepim
  cnpj
  comprasnet
  cpgf
  cvm
  ...

2. Run Your First Pipeline

Let’s run the PEP CGU pipeline (Politically Exposed Persons):
bracc-etl run \
  --source pep_cgu \
  --neo4j-password your-password \
  --data-dir ./data
What happens:
  1. ✅ Downloads PEP list from Portal da Transparência
  2. ✅ Extracts and normalizes names/documents
  3. ✅ Loads Person nodes with pep: true property
  4. ✅ Creates IngestionRun node with status loaded
Output:
2025-03-05 10:00:00 INFO [pep_cgu] Starting extraction...
2025-03-05 10:00:15 INFO Downloaded: data/pep_cgu/pep_cgu_202503.csv
2025-03-05 10:00:15 INFO [pep_cgu] Starting transformation...
2025-03-05 10:00:20 INFO Transformed 15,432 PEP records
2025-03-05 10:00:20 INFO [pep_cgu] Starting load...
2025-03-05 10:00:45 INFO Batch written: 15,432 rows
2025-03-05 10:00:45 INFO [pep_cgu] Pipeline complete.

3. Verify in Neo4j

Open Neo4j Browser (http://localhost:7474) and run:
MATCH (p:Person {pep: true})
RETURN p.name, p.cpf, p.cargo
LIMIT 10

Common Pipelines

CNPJ (Companies)

CNPJ is a large dataset (50M+ records, 100GB compressed). Use streaming mode and expect 6+ hours.
Step 1: Download data
bracc-etl download \
  --output-dir ./data/cnpj \
  --files 10 \
  --skip-existing
This downloads:
  • Empresas0.zip through Empresas9.zip (companies)
  • Socios0.zip through Socios9.zip (partners)
  • Estabelecimentos0.zip through Estabelecimentos9.zip (establishments)
  • Reference tables (naturezas, qualificações, etc.)
Step 2: Run pipeline in streaming mode
bracc-etl run \
  --source cnpj \
  --neo4j-password your-password \
  --data-dir ./data \
  --streaming
Streaming phases:
  1. Phase 1: Build establishment lookup (2 hours)
  2. Phase 2: Load companies (1.5 hours)
  3. Phase 3: Load partners and relationships (2.5 hours)
Resume from phase N:
bracc-etl run \
  --source cnpj \
  --neo4j-password your-password \
  --streaming \
  --start-phase 2  # Skip phase 1
Memory: 2GB (fixed, regardless of dataset size)

TSE (Elections)

bracc-etl run \
  --source tse \
  --neo4j-password your-password \
  --data-dir ./data
Loads:
  • 30M+ donation records
  • Election results (1996-2024)
  • Candidate profiles
Graph:
  • Person -[:CANDIDATO_EM]-> Election
  • Person -[:DOOU_PARA]-> Person (donations)
  • Company -[:DOOU_PARA]-> Person

Transparência (Federal Contracts)

Step 1: Download specific datasets
python etl/scripts/download_transparencia.py \
  --year 2025 \
  --months 1 2 3 \
  --datasets compras,servidores \
  --output-dir ./data/transparencia
Step 2: Run pipeline
bracc-etl run \
  --source transparencia \
  --neo4j-password your-password \
  --data-dir ./data
Loads:
  • Federal contracts (Company -[:VENCEU_CONTRATO]-> Contract)
  • Public servants (Person -[:SERVIDOR_EM]-> Company)

Sanctions (CEIS + CNEP)

bracc-etl run \
  --source sanctions \
  --neo4j-password your-password \
  --data-dir ./data
Loads:
  • Administrative sanctions
  • Punishment registry
  • Company -[:SANCIONADA_EM]-> Sanction

International Sanctions

OFAC (US Treasury):
bracc-etl run --source ofac --neo4j-password your-password
EU Sanctions:
bracc-etl run --source eu_sanctions --neo4j-password your-password
UN Sanctions:
bracc-etl run --source un_sanctions --neo4j-password your-password

CLI Options

Full Command Reference

bracc-etl run \
  --source <pipeline-name> \
  --neo4j-uri bolt://localhost:7687 \
  --neo4j-user neo4j \
  --neo4j-password <password> \
  --neo4j-database neo4j \
  --data-dir ./data \
  --limit 10000 \
  --chunk-size 50000 \
  --linking-tier full \
  --streaming \
  --start-phase 1 \
  --history
Options explained:
OptionDescriptionDefault
--sourcePipeline name (required)-
--neo4j-uriNeo4j connection URIbolt://localhost:7687
--neo4j-userNeo4j usernameneo4j
--neo4j-passwordNeo4j password (required)-
--neo4j-databaseNeo4j database nameneo4j
--data-dirDirectory for downloaded data./data
--limitLimit rows processed (for testing)None
--chunk-sizeBatch size for processing50000
--linking-tierPost-load linking (community or full)full
--streamingUse streaming mode (for large datasets)False
--start-phaseResume from phase N (streaming only)1
--historyEnable history mode (temporal data)False

Testing with Limits

For quick testing, use --limit to process only N rows:
bracc-etl run \
  --source cnpj \
  --neo4j-password your-password \
  --limit 10000 \
  --no-streaming  # Batch mode for small dataset
This processes only 10,000 companies (instead of 50M).

Linking Tiers

Community tier (fast, exact matching):
bracc-etl run --source cnpj --linking-tier community
  • Links via exact CPF/CNPJ matches
  • Runtime: <1 minute
Full tier (slow, probabilistic matching):
bracc-etl run --source cnpj --linking-tier full
  • Uses Splink for fuzzy name matching
  • Runtime: 30-60 minutes
  • Requires pip install -e ".[resolution]"

History Mode

For temporal datasets (e.g., CNPJ monthly snapshots):
bracc-etl run \
  --source cnpj \
  --neo4j-password your-password \
  --history
Graph structure:
  • SOCIO_DE_SNAPSHOT: Immutable historical records
  • SOCIO_DE: Derived “latest” projection
Query historical ownership:
MATCH (p:Person)-[r:SOCIO_DE_SNAPSHOT]->(c:Company)
WHERE r.snapshot_date = '2024-01-01'
RETURN p.name, c.razao_social, r.qualificacao

Monitoring Pipeline Status

Check Ingestion Status

bracc-etl sources --status \
  --neo4j-password your-password
Output:
Source               Status          Rows In    Loaded     Started              Finished            
----------------------------------------------------------------------------------------------------
cnpj                 loaded       50,123,456 50,123,456  2025-03-05T10:00:00  2025-03-05T16:30:00 
tse                  loaded       30,456,789 30,456,789  2025-03-04T14:00:00  2025-03-04T14:45:00 
pep_cgu              loaded           15,432     15,432  2025-03-05T10:00:00  2025-03-05T10:00:45 
sanctions            loaded          234,567    234,567  2025-03-03T08:00:00  2025-03-03T08:15:00 

Query IngestionRun Nodes

MATCH (r:IngestionRun)
WHERE r.source_id = 'cnpj'
RETURN r.status, r.rows_in, r.rows_loaded, 
       r.started_at, r.finished_at, r.error
ORDER BY r.started_at DESC
LIMIT 1
Statuses:
  • running: Pipeline in progress
  • loaded: Success
  • quality_fail: Validation error (check r.error)

Failed Pipeline Debugging

If a pipeline fails:
  1. Check IngestionRun error:
    MATCH (r:IngestionRun {source_id: 'cnpj'})
    WHERE r.status = 'quality_fail'
    RETURN r.error, r.started_at
    ORDER BY r.started_at DESC
    LIMIT 1
    
  2. Check logs:
    bracc-etl run --source cnpj --neo4j-password pass 2>&1 | tee pipeline.log
    
  3. Test with limit:
    bracc-etl run --source cnpj --limit 1000 --neo4j-password pass
    

Data Directory Structure

After running pipelines, your data/ directory will look like:
data/
├── cnpj/
│   ├── raw/
│   │   ├── Empresas0.zip
│   │   ├── Socios0.zip
│   │   └── Estabelecimentos0.zip
│   ├── extracted/
│   │   ├── EMPRE0.csv
│   │   ├── SOCIO0.csv
│   │   └── ESTABELE0.csv
│   ├── reference/
│   │   ├── Naturezas.csv
│   │   └── Qualificacoes.csv
│   └── download_manifest.json
├── transparencia/
│   ├── raw/
│   │   ├── compras_202501.zip
│   │   └── servidores_202501.zip
│   ├── contratos.csv
│   └── servidores.csv
└── pep_cgu/
    └── pep_cgu_202503.csv

Performance Tips

1. Use Streaming for Large Datasets

Datasets > 10GB should use --streaming:
  • CNPJ, PGFN, TSE donations

2. Adjust Chunk Size

Default: 50,000 rows per batch Increase for faster processing (if you have RAM):
bracc-etl run --source cnpj --chunk-size 100000
Decrease for memory constraints:
bracc-etl run --source cnpj --chunk-size 10000

3. Disable Linking for Testing

Use --linking-tier community to skip expensive Splink matching.

4. Resume Failed Pipelines

For streaming pipelines, use --start-phase to resume:
bracc-etl run --source cnpj --streaming --start-phase 3

Neo4j Configuration

For optimal performance, configure Neo4j: 1. Increase heap size (neo4j.conf):
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G
2. Create indexes (before running pipelines):
CREATE INDEX company_cnpj IF NOT EXISTS FOR (c:Company) ON (c.cnpj);
CREATE INDEX person_cpf IF NOT EXISTS FOR (p:Person) ON (p.cpf);
CREATE INDEX partner_id IF NOT EXISTS FOR (p:Partner) ON (p.partner_id);
3. Enable APOC (for advanced queries):
docker run -d \
  --name neo4j \
  -p 7474:7474 -p 7687:7687 \
  -e NEO4J_AUTH=neo4j/pass \
  -e NEO4J_PLUGINS='["apoc"]' \
  neo4j:5.15

Troubleshooting

Issue: “Could not resolve CNPJ release”

Cause: Receita Federal URLs changed Fix: Set explicit share token:
export CNPJ_SHARE_TOKEN=gn672Ad4CF8N6TK
bracc-etl download --output-dir ./data/cnpj

Issue: “TransientError: Deadlock”

Cause: Neo4j deadlock during concurrent writes Fix: Reduce chunk size:
bracc-etl run --source cnpj --chunk-size 5000

Issue: “MemoryError”

Cause: Dataset too large for batch mode Fix: Use streaming mode:
bracc-etl run --source cnpj --streaming

Issue: “httpx.ConnectTimeout”

Cause: Government portal slow/down Fix: Increase timeout:
python etl/scripts/download_cnpj.py --timeout 1200  # 20 minutes

Next Steps

Creating Pipelines

Build your own pipeline for a custom data source

Data Sources

Browse 45+ available sources

Pipeline Architecture

Learn about design patterns

Overview

Back to ETL framework overview

Build docs developers (and LLMs) love