Overview
This guide walks through creating a production-grade ETL pipeline from scratch. We’ll build a pipeline for ANATEL (Brazilian telecom regulator) as an example.Prerequisites
- Python 3.12+
- Neo4j 5.x running locally
- Basic understanding of ETL concepts
- Familiarity with Pandas for data processing
Pipeline Template
All pipelines inherit fromPipeline base class:
Step 1: Create Download Script
Createetl/scripts/download_anatel.py:
etl/scripts/_download_utils.py):
download_file(url, dest, timeout)— HTTP download with progressextract_zip(zip_path, extract_dir)— Safe ZIP extractionvalidate_csv(path, expected_cols)— Column validation
Step 2: Implement Extract Phase
Inetl/src/bracc_etl/pipelines/anatel.py:
- Create data directory structure
- Check if file exists (skip re-download)
- Handle HTTP errors gracefully
- Log progress
Step 3: Implement Transform Phase
- Copy DataFrame:
df = self._raw.copy()to avoid mutations - Vectorized operations: Use
.map()instead of.apply()for speed - Deduplicate: Always deduplicate nodes on primary keys
- Type safety: Use
.astype(str)before transforms
Step 4: Implement Load Phase
- Use
Neo4jBatchLoaderfor efficient bulk inserts - Always specify
key_fieldfor MERGE semantics - Load nodes before relationships (foreign key integrity)
- Track
rows_inandrows_loadedfor monitoring
Step 5: Register Pipeline
Add toetl/src/bracc_etl/runner.py:54:
Step 6: Test the Pipeline
Run with Limit
Verify Results
Check IngestionRun
status = 'loaded'
Advanced Patterns
Pattern 1: Streaming for Large Datasets
For datasets > 10GB, implementrun_streaming():
Pattern 2: Multi-File Processing
For sources with multiple files (like CNPJ):Pattern 3: API Pagination
For paginated APIs:Pattern 4: Schema Validation
Add Pandera schema for validation:Pattern 5: Reference Table Resolution
For coded values that need human-readable labels:Testing
Unit Tests
Createetl/tests/test_anatel.py:
Integration Tests
Run full pipeline with test data:Production Checklist
Before merging your pipeline:- Download script works with real data source
- Extract phase handles missing files gracefully
- Transform phase normalizes all key fields (CNPJ, names, dates)
- Load phase uses MERGE (not CREATE) for idempotency
- Deduplication on all node primary keys
- Error handling with try/except and logging
- Progress logging every 10k-100k rows
- Schema validation with Pandera (optional but recommended)
- Unit tests for transform logic
- Integration test with sample data
- Documentation in source registry (
source_registry_br_v1.csv) - Registered in
runner.pyPIPELINES dict
Complete Example
Full working pipeline:etl/src/bracc_etl/pipelines/anatel.py
Next Steps
Running Pipelines
Test your new pipeline locally
Pipeline Architecture
Learn advanced design patterns
Data Sources
Add your pipeline to the source registry
Overview
Back to ETL framework overview