Overview
The VoicePact ETL (Extract, Transform, Load) pipeline is implemented in Python using pandas for data transformation and SQLAlchemy for database operations. The pipeline extracts contract data from the operational database, transforms it into analytics-ready summaries, and loads it into a separate SQLite analytics database.Architecture
Pipeline Location
The ETL pipeline is located at:Database Configuration
- Source Database: The operational VoicePact database (configured via
DATABASE_URL) - Destination Database:
server/etl/analytics.db(created automatically)
Prerequisites
Required Dependencies
The ETL pipeline requires the following Python packages:Source Data Requirements
The pipeline expects the following tables to exist in the source database:contracts: Contract records with terms, amounts, and statuscontract_parties: Party information (buyers, sellers, mediators)payments: Payment transactions and escrow releases
Running the ETL Pipeline
Basic Execution
From theserver directory, run:
Expected Output
You should see log output similar to:Scheduling
For production use, schedule the ETL pipeline to run periodically using cron:Pipeline Implementation
Stage 1: Extract
The extract stage queries three tables from the source database:run_analytics.py:42-87:
- Uses async SQLAlchemy engine for database connections
- Converts query results to pandas DataFrames
- Handles empty tables gracefully with warnings
- Includes comprehensive error handling for database issues
Stage 2: Transform
The transform stage performs several data operations:Date Conversions
Feature Engineering
Calculates time to completion:Party Data Pivoting
Transforms party data from long format (multiple rows per contract) to wide format (one row per contract):Payment Aggregation
Sums released payments per contract:Data Merging
Joins all data sources using left joins:Column Selection and Cleanup
Selects and renames columns for the final analytics table:run_analytics.py:91-189.
Stage 3: Load
The load stage writes the transformed DataFrame to the analytics database:run_analytics.py:192-220:
- Uses
if_exists="replace"to fully refresh the table on each run - Processes data in chunks of 1000 rows for memory efficiency
- Creates the analytics database file if it doesn’t exist
- Includes error handling for database connection issues
Accessing the Analytics Database
Using SQLite CLI
Using Python
Using BI Tools
Connect business intelligence tools like:- Metabase: Use SQLite connector with path
server/etl/analytics.db - Apache Superset: Configure SQLite database connection
- Tableau: Connect via SQLite ODBC driver
- Power BI: Use SQLite connector
Monitoring and Troubleshooting
Common Issues
Source Database Not Found
./voicepact.db or update the DATABASE_URL environment variable.
Empty Tables
Transformation Failures
Performance Considerations
- Execution Time: For 10,000 contracts, expect ~5-10 seconds
- Memory Usage: The pipeline loads all data into memory; for very large datasets (>100k contracts), consider implementing incremental loads
- Database Locking: The source database uses WAL mode to allow concurrent reads during ETL
Extending the Pipeline
Adding New Metrics
To add calculated fields to the analytics table:- Add the calculation in the
transform_data()function - Include the new column in the
final_columnsdictionary - Run the pipeline to refresh the analytics table
Adding New Source Tables
To incorporate additional source data:- Add a query to the
queriesdictionary inextract_data() - Process the new DataFrame in
transform_data() - Merge it with the summary DataFrame using appropriate join keys
Incremental Loading
For large production systems, consider implementing incremental updates:Best Practices
- Schedule Regularly: Run the ETL pipeline at off-peak hours (e.g., 2 AM daily)
- Monitor Logs: Capture pipeline output to log files for troubleshooting
- Version Control: Track changes to transformation logic in git
- Test Transformations: Validate data quality after pipeline runs
- Backup Analytics DB: Periodically backup the analytics database for historical analysis
- Document Changes: Update this documentation when modifying the pipeline
Next Steps
- Review example analytics queries
- Explore the database schema
- Set up a BI tool to visualize the analytics data