Prerequisites
Before running pipelines, ensure you have:
Python 3.12+
Check your Python version: python --version # Should be 3.12 or higher
Neo4j 5.x
Install Neo4j Desktop or run via Docker: docker run -d \
--name neo4j \
-p 7474:7474 -p 7687:7687 \
-e NEO4J_AUTH=neo4j/your-password \
neo4j:5.15
Access Neo4j Browser at http://localhost:7474
Install Dependencies
Navigate to the ETL directory and install: cd ~/workspace/source/etl
pip install -e .
# For BigQuery sources
pip install -e ".[bigquery]"
# For development
pip install -e ".[dev]"
Quick Start
1. List Available Pipelines
Output:
Available pipelines:
bcb
bndes
caged
camara
camara_inquiries
ceaf
cepim
cnpj
comprasnet
cpgf
cvm
...
2. Run Your First Pipeline
Let’s run the PEP CGU pipeline (Politically Exposed Persons):
bracc-etl run \
--source pep_cgu \
--neo4j-password your-password \
--data-dir ./data
What happens :
✅ Downloads PEP list from Portal da Transparência
✅ Extracts and normalizes names/documents
✅ Loads Person nodes with pep: true property
✅ Creates IngestionRun node with status loaded
Output :
2025-03-05 10:00:00 INFO [pep_cgu] Starting extraction...
2025-03-05 10:00:15 INFO Downloaded: data/pep_cgu/pep_cgu_202503.csv
2025-03-05 10:00:15 INFO [pep_cgu] Starting transformation...
2025-03-05 10:00:20 INFO Transformed 15,432 PEP records
2025-03-05 10:00:20 INFO [pep_cgu] Starting load...
2025-03-05 10:00:45 INFO Batch written: 15,432 rows
2025-03-05 10:00:45 INFO [pep_cgu] Pipeline complete.
3. Verify in Neo4j
Open Neo4j Browser (http://localhost:7474) and run:
MATCH ( p : Person { pep : true } )
RETURN p . name , p . cpf , p . cargo
LIMIT 10
Common Pipelines
CNPJ (Companies)
CNPJ is a large dataset (50M+ records, 100GB compressed). Use streaming mode and expect 6+ hours.
Step 1: Download data
bracc-etl download \
--output-dir ./data/cnpj \
--files 10 \
--skip-existing
This downloads:
Empresas0.zip through Empresas9.zip (companies)
Socios0.zip through Socios9.zip (partners)
Estabelecimentos0.zip through Estabelecimentos9.zip (establishments)
Reference tables (naturezas, qualificações, etc.)
Step 2: Run pipeline in streaming mode
bracc-etl run \
--source cnpj \
--neo4j-password your-password \
--data-dir ./data \
--streaming
Streaming phases :
Phase 1 : Build establishment lookup (2 hours)
Phase 2 : Load companies (1.5 hours)
Phase 3 : Load partners and relationships (2.5 hours)
Resume from phase N :
bracc-etl run \
--source cnpj \
--neo4j-password your-password \
--streaming \
--start-phase 2 # Skip phase 1
Memory : 2GB (fixed, regardless of dataset size)
TSE (Elections)
bracc-etl run \
--source tse \
--neo4j-password your-password \
--data-dir ./data
Loads :
30M+ donation records
Election results (1996-2024)
Candidate profiles
Graph :
Person -[:CANDIDATO_EM]-> Election
Person -[:DOOU_PARA]-> Person (donations)
Company -[:DOOU_PARA]-> Person
Transparência (Federal Contracts)
Step 1: Download specific datasets
python etl/scripts/download_transparencia.py \
--year 2025 \
--months 1 2 3 \
--datasets compras,servidores \
--output-dir ./data/transparencia
Step 2: Run pipeline
bracc-etl run \
--source transparencia \
--neo4j-password your-password \
--data-dir ./data
Loads :
Federal contracts (Company -[:VENCEU_CONTRATO]-> Contract)
Public servants (Person -[:SERVIDOR_EM]-> Company)
Sanctions (CEIS + CNEP)
bracc-etl run \
--source sanctions \
--neo4j-password your-password \
--data-dir ./data
Loads :
Administrative sanctions
Punishment registry
Company -[:SANCIONADA_EM]-> Sanction
International Sanctions
OFAC (US Treasury) :
bracc-etl run --source ofac --neo4j-password your-password
EU Sanctions :
bracc-etl run --source eu_sanctions --neo4j-password your-password
UN Sanctions :
bracc-etl run --source un_sanctions --neo4j-password your-password
CLI Options
Full Command Reference
bracc-etl run \
--source < pipeline-nam e > \
--neo4j-uri bolt://localhost:7687 \
--neo4j-user neo4j \
--neo4j-password < passwor d > \
--neo4j-database neo4j \
--data-dir ./data \
--limit 10000 \
--chunk-size 50000 \
--linking-tier full \
--streaming \
--start-phase 1 \
--history
Options explained :
Option Description Default --sourcePipeline name (required) - --neo4j-uriNeo4j connection URI bolt://localhost:7687--neo4j-userNeo4j username neo4j--neo4j-passwordNeo4j password (required) - --neo4j-databaseNeo4j database name neo4j--data-dirDirectory for downloaded data ./data--limitLimit rows processed (for testing) None--chunk-sizeBatch size for processing 50000--linking-tierPost-load linking (community or full) full--streamingUse streaming mode (for large datasets) False--start-phaseResume from phase N (streaming only) 1--historyEnable history mode (temporal data) False
Testing with Limits
For quick testing, use --limit to process only N rows:
bracc-etl run \
--source cnpj \
--neo4j-password your-password \
--limit 10000 \
--no-streaming # Batch mode for small dataset
This processes only 10,000 companies (instead of 50M).
Linking Tiers
Community tier (fast, exact matching):
bracc-etl run --source cnpj --linking-tier community
Links via exact CPF/CNPJ matches
Runtime: <1 minute
Full tier (slow, probabilistic matching):
bracc-etl run --source cnpj --linking-tier full
Uses Splink for fuzzy name matching
Runtime: 30-60 minutes
Requires pip install -e ".[resolution]"
History Mode
For temporal datasets (e.g., CNPJ monthly snapshots):
bracc-etl run \
--source cnpj \
--neo4j-password your-password \
--history
Graph structure :
SOCIO_DE_SNAPSHOT: Immutable historical records
SOCIO_DE: Derived “latest” projection
Query historical ownership:
MATCH ( p : Person ) - [ r : SOCIO_DE_SNAPSHOT ] -> ( c : Company )
WHERE r . snapshot_date = '2024-01-01'
RETURN p . name , c . razao_social , r . qualificacao
Monitoring Pipeline Status
Check Ingestion Status
bracc-etl sources --status \
--neo4j-password your-password
Output:
Source Status Rows In Loaded Started Finished
----------------------------------------------------------------------------------------------------
cnpj loaded 50,123,456 50,123,456 2025-03-05T10:00:00 2025-03-05T16:30:00
tse loaded 30,456,789 30,456,789 2025-03-04T14:00:00 2025-03-04T14:45:00
pep_cgu loaded 15,432 15,432 2025-03-05T10:00:00 2025-03-05T10:00:45
sanctions loaded 234,567 234,567 2025-03-03T08:00:00 2025-03-03T08:15:00
Query IngestionRun Nodes
MATCH ( r : IngestionRun )
WHERE r . source_id = 'cnpj'
RETURN r . status , r . rows_in , r . rows_loaded ,
r . started_at , r . finished_at , r . error
ORDER BY r . started_at DESC
LIMIT 1
Statuses :
running: Pipeline in progress
loaded: Success
quality_fail: Validation error (check r.error)
Failed Pipeline Debugging
If a pipeline fails:
Check IngestionRun error :
MATCH ( r : IngestionRun { source_id : 'cnpj' } )
WHERE r . status = 'quality_fail'
RETURN r . error , r . started_at
ORDER BY r . started_at DESC
LIMIT 1
Check logs :
bracc-etl run --source cnpj --neo4j-password pass 2>&1 | tee pipeline.log
Test with limit :
bracc-etl run --source cnpj --limit 1000 --neo4j-password pass
Data Directory Structure
After running pipelines, your data/ directory will look like:
data/
├── cnpj/
│ ├── raw/
│ │ ├── Empresas0.zip
│ │ ├── Socios0.zip
│ │ └── Estabelecimentos0.zip
│ ├── extracted/
│ │ ├── EMPRE0.csv
│ │ ├── SOCIO0.csv
│ │ └── ESTABELE0.csv
│ ├── reference/
│ │ ├── Naturezas.csv
│ │ └── Qualificacoes.csv
│ └── download_manifest.json
├── transparencia/
│ ├── raw/
│ │ ├── compras_202501.zip
│ │ └── servidores_202501.zip
│ ├── contratos.csv
│ └── servidores.csv
└── pep_cgu/
└── pep_cgu_202503.csv
1. Use Streaming for Large Datasets
Datasets > 10GB should use --streaming:
CNPJ, PGFN, TSE donations
2. Adjust Chunk Size
Default: 50,000 rows per batch
Increase for faster processing (if you have RAM):
bracc-etl run --source cnpj --chunk-size 100000
Decrease for memory constraints :
bracc-etl run --source cnpj --chunk-size 10000
3. Disable Linking for Testing
Use --linking-tier community to skip expensive Splink matching.
4. Resume Failed Pipelines
For streaming pipelines, use --start-phase to resume:
bracc-etl run --source cnpj --streaming --start-phase 3
Neo4j Configuration
For optimal performance, configure Neo4j:
1. Increase heap size (neo4j.conf):
dbms.memory.heap.initial_size =4G
dbms.memory.heap.max_size =8G
dbms.memory.pagecache.size =4G
2. Create indexes (before running pipelines):
CREATE INDEX company_cnpj IF NOT EXISTS FOR ( c : Company ) ON ( c . cnpj );
CREATE INDEX person_cpf IF NOT EXISTS FOR ( p : Person ) ON ( p . cpf );
CREATE INDEX partner_id IF NOT EXISTS FOR ( p : Partner ) ON ( p . partner_id );
3. Enable APOC (for advanced queries):
docker run -d \
--name neo4j \
-p 7474:7474 -p 7687:7687 \
-e NEO4J_AUTH=neo4j/pass \
-e NEO4J_PLUGINS='["apoc"]' \
neo4j:5.15
Troubleshooting
Issue: “Could not resolve CNPJ release”
Cause : Receita Federal URLs changed
Fix : Set explicit share token:
export CNPJ_SHARE_TOKEN = gn672Ad4CF8N6TK
bracc-etl download --output-dir ./data/cnpj
Issue: “TransientError: Deadlock”
Cause : Neo4j deadlock during concurrent writes
Fix : Reduce chunk size:
bracc-etl run --source cnpj --chunk-size 5000
Issue: “MemoryError”
Cause : Dataset too large for batch mode
Fix : Use streaming mode:
bracc-etl run --source cnpj --streaming
Issue: “httpx.ConnectTimeout”
Cause : Government portal slow/down
Fix : Increase timeout:
python etl/scripts/download_cnpj.py --timeout 1200 # 20 minutes
Next Steps
Creating Pipelines Build your own pipeline for a custom data source
Data Sources Browse 45+ available sources
Pipeline Architecture Learn about design patterns
Overview Back to ETL framework overview