Skip to main content

Overview

The VoicePact ETL (Extract, Transform, Load) pipeline is implemented in Python using pandas for data transformation and SQLAlchemy for database operations. The pipeline extracts contract data from the operational database, transforms it into analytics-ready summaries, and loads it into a separate SQLite analytics database.

Architecture

Pipeline Location

The ETL pipeline is located at:
server/etl/run_analytics.py

Database Configuration

  • Source Database: The operational VoicePact database (configured via DATABASE_URL)
  • Destination Database: server/etl/analytics.db (created automatically)

Prerequisites

Required Dependencies

The ETL pipeline requires the following Python packages:
pandas>=2.0.0
sqlalchemy>=2.0.0
These should already be installed if you’ve set up the VoicePact server.

Source Data Requirements

The pipeline expects the following tables to exist in the source database:
  • contracts: Contract records with terms, amounts, and status
  • contract_parties: Party information (buyers, sellers, mediators)
  • payments: Payment transactions and escrow releases

Running the ETL Pipeline

Basic Execution

From the server directory, run:
cd server
python etl/run_analytics.py

Expected Output

You should see log output similar to:
2026-03-06 10:30:15 - [INFO] - Starting VoicePact Analytics ETL pipeline...
2026-03-06 10:30:15 - [INFO] - Connecting to SOURCE database: sqlite+aiosqlite:///./voicepact.db
2026-03-06 10:30:15 - [INFO] - EXTRACT: Running query for table 'contracts'...
2026-03-06 10:30:15 - [INFO] - EXTRACT: Read 142 rows from 'contracts'.
2026-03-06 10:30:15 - [INFO] - EXTRACT: Running query for table 'parties'...
2026-03-06 10:30:15 - [INFO] - EXTRACT: Read 284 rows from 'parties'.
2026-03-06 10:30:15 - [INFO] - EXTRACT: Running query for table 'payments'...
2026-03-06 10:30:15 - [INFO] - EXTRACT: Read 89 rows from 'payments'.
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Starting data transformation...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Processing 'contracts' data...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Pivoting 'parties' data...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Aggregating 'payments' data...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Merging all data sources...
2026-03-06 10:30:15 - [INFO] - TRANSFORM: Transformation complete. 142 rows processed.
2026-03-06 10:30:15 - [INFO] - Connecting to DESTINATION database: sqlite:///server/etl/analytics.db
2026-03-06 10:30:16 - [INFO] - LOAD: Successfully loaded 142 rows into 'fct_contracts_summary' in server/etl/analytics.db
2026-03-06 10:30:16 - [INFO] - VoicePact Analytics ETL pipeline finished successfully.

Scheduling

For production use, schedule the ETL pipeline to run periodically using cron:
# Run daily at 2 AM
0 2 * * * cd /path/to/voicepact/server && /path/to/python etl/run_analytics.py >> /var/log/voicepact-etl.log 2>&1

Pipeline Implementation

Stage 1: Extract

The extract stage queries three tables from the source database:
queries = {
    "contracts": "SELECT * FROM contracts",
    "parties": "SELECT * FROM contract_parties",
    "payments": "SELECT * FROM payments",
}
Key implementation details from run_analytics.py:42-87:
  • Uses async SQLAlchemy engine for database connections
  • Converts query results to pandas DataFrames
  • Handles empty tables gracefully with warnings
  • Includes comprehensive error handling for database issues

Stage 2: Transform

The transform stage performs several data operations:

Date Conversions

df_contracts["created_at"] = pd.to_datetime(df_contracts["created_at"])
df_contracts["completed_at"] = pd.to_datetime(df_contracts["completed_at"])

Feature Engineering

Calculates time to completion:
df_contracts["time_to_completion_days"] = (
    df_contracts["completed_at"] - df_contracts["created_at"]
).dt.days
Extracts product from JSON terms field:
def get_product(terms):
    if isinstance(terms, str):
        try:
            terms = json.loads(terms)
        except json.JSONDecodeError:
            return None
    return terms.get("product") if isinstance(terms, dict) else None

df_contracts["product"] = df_contracts["terms"].apply(get_product)

Party Data Pivoting

Transforms party data from long format (multiple rows per contract) to wide format (one row per contract):
# Separate buyers and sellers
df_buyer = (
    df_parties[df_parties["role"] == "buyer"]
    .loc[:, ["contract_id", "phone_number"]]
    .rename(columns={"phone_number": "buyer_phone"})
)
df_seller = (
    df_parties[df_parties["role"] == "seller"]
    .loc[:, ["contract_id", "phone_number"]]
    .rename(columns={"phone_number": "seller_phone"})
)

Payment Aggregation

Sums released payments per contract:
df_payments["amount"] = pd.to_numeric(df_payments["amount"], errors="coerce").fillna(0)

df_payments_agg = (
    df_payments[df_payments["status"] == "released"]
    .groupby("contract_id")["amount"]
    .sum()
    .reset_index()
    .rename(columns={"amount": "total_paid_released"})
)

Data Merging

Joins all data sources using left joins:
df_summary = pd.merge(df_contracts, df_buyer, on="contract_id", how="left")
df_summary = pd.merge(df_summary, df_seller, on="contract_id", how="left")
df_summary = pd.merge(df_summary, df_payments_agg, on="contract_id", how="left")

Column Selection and Cleanup

Selects and renames columns for the final analytics table:
final_columns = {
    "id": "contract_id",
    "contract_type": "contract_type",
    "status": "contract_status",
    "total_amount": "contract_total_amount",
    "currency": "currency",
    "product": "product",
    "created_at": "created_at",
    "completed_at": "completed_at",
    "time_to_completion_days": "time_to_completion_days",
    "buyer_phone": "buyer_phone",
    "seller_phone": "seller_phone",
    "total_paid_released": "total_paid_released",
}

df_final = df_summary.rename(columns=final_columns)
df_final = df_final[final_columns.values()]
See implementation at run_analytics.py:91-189.

Stage 3: Load

The load stage writes the transformed DataFrame to the analytics database:
df.to_sql(
    "fct_contracts_summary",
    engine,
    if_exists="replace",
    index=False,
    chunksize=1000,
)
Key implementation details from run_analytics.py:192-220:
  • Uses if_exists="replace" to fully refresh the table on each run
  • Processes data in chunks of 1000 rows for memory efficiency
  • Creates the analytics database file if it doesn’t exist
  • Includes error handling for database connection issues

Accessing the Analytics Database

Using SQLite CLI

sqlite3 server/etl/analytics.db
Once connected, you can run queries:
-- View table schema
.schema fct_contracts_summary

-- Count total rows
SELECT COUNT(*) FROM fct_contracts_summary;

-- Sample data
SELECT * FROM fct_contracts_summary LIMIT 5;

Using Python

import pandas as pd
import sqlite3

# Connect to analytics database
conn = sqlite3.connect('server/etl/analytics.db')

# Run query
df = pd.read_sql_query(
    "SELECT contract_type, COUNT(*) as count FROM fct_contracts_summary GROUP BY contract_type",
    conn
)

print(df)
conn.close()

Using BI Tools

Connect business intelligence tools like:
  • Metabase: Use SQLite connector with path server/etl/analytics.db
  • Apache Superset: Configure SQLite database connection
  • Tableau: Connect via SQLite ODBC driver
  • Power BI: Use SQLite connector

Monitoring and Troubleshooting

Common Issues

Source Database Not Found

Database error during EXTRACT: unable to open database file
Solution: Ensure the VoicePact application database exists at ./voicepact.db or update the DATABASE_URL environment variable.

Empty Tables

No data found for table 'contracts'.
Solution: This is a warning, not an error. The pipeline will continue but produce an empty analytics table. Add contracts to the system first.

Transformation Failures

ETL pipeline failed during TRANSFORM step.
Solution: Check that the source tables have the expected schema. The pipeline expects specific column names and data types.

Performance Considerations

  • Execution Time: For 10,000 contracts, expect ~5-10 seconds
  • Memory Usage: The pipeline loads all data into memory; for very large datasets (>100k contracts), consider implementing incremental loads
  • Database Locking: The source database uses WAL mode to allow concurrent reads during ETL

Extending the Pipeline

Adding New Metrics

To add calculated fields to the analytics table:
  1. Add the calculation in the transform_data() function
  2. Include the new column in the final_columns dictionary
  3. Run the pipeline to refresh the analytics table
Example - adding a “high value” flag:
# In transform_data() function, after other transformations:
df_summary["is_high_value"] = df_summary["total_amount"] > 50000

# Add to final_columns:
final_columns["is_high_value"] = "is_high_value"

Adding New Source Tables

To incorporate additional source data:
  1. Add a query to the queries dictionary in extract_data()
  2. Process the new DataFrame in transform_data()
  3. Merge it with the summary DataFrame using appropriate join keys

Incremental Loading

For large production systems, consider implementing incremental updates:
# Track last ETL run time
last_run = get_last_run_timestamp()

# Modify extract queries
queries = {
    "contracts": f"SELECT * FROM contracts WHERE created_at > '{last_run}'",
    # ...
}

# Use if_exists='append' instead of 'replace'
df.to_sql(
    "fct_contracts_summary",
    engine,
    if_exists="append",
    index=False,
)

Best Practices

  • Schedule Regularly: Run the ETL pipeline at off-peak hours (e.g., 2 AM daily)
  • Monitor Logs: Capture pipeline output to log files for troubleshooting
  • Version Control: Track changes to transformation logic in git
  • Test Transformations: Validate data quality after pipeline runs
  • Backup Analytics DB: Periodically backup the analytics database for historical analysis
  • Document Changes: Update this documentation when modifying the pipeline

Next Steps

Build docs developers (and LLMs) love