Overview
In this tutorial, you’ll build a complete ETL (Extract, Transform, Load) workflow that:
Extracts data from multiple sources (API, database, CSV)
Performs complex transformations and data quality checks
Loads data into a data warehouse
Implements error handling and monitoring
This tutorial builds on the “Building Your First Pipeline” guide. Complete that first if you’re new to Mage.
What You’ll Build
A production-ready ETL pipeline that:
Loads customer data from PostgreSQL
Fetches order data from a REST API
Reads product catalog from CSV
Joins and transforms the datasets
Performs data quality validation
Exports to a data warehouse
Create an ETL Pipeline
Start by creating a new batch pipeline:
Navigate to Pipelines in the Mage UI
Click + New pipeline
Select Standard (batch)
Name it customer_order_etl
Add a description: “ETL pipeline for customer order analytics”
Extract: Load Data from PostgreSQL
First, load customer data from your PostgreSQL database. Edit io_config.yaml in your project root: default :
POSTGRES_CONNECT_TIMEOUT : 10
POSTGRES_DBNAME : your_database
POSTGRES_HOST : localhost
POSTGRES_PASSWORD : your_password
POSTGRES_PORT : 5432
POSTGRES_USER : your_user
For production, use environment variables instead of hardcoding credentials: POSTGRES_PASSWORD : "{{ env_var('DB_PASSWORD') }}"
Create the Data Loader
Add a Data loader block
Select Python > PostgreSQL
Name it load_customers
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
if 'data_loader' not in globals ():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data_from_postgres ( * args , ** kwargs ):
"""
Load customer data from PostgreSQL
"""
query = """
SELECT
customer_id,
first_name,
last_name,
email,
created_at,
country,
is_active
FROM customers
WHERE is_active = true
AND created_at >= CURRENT_DATE - INTERVAL '1 year'
"""
config_path = path.join(get_repo_path(), 'io_config.yaml' )
config_profile = 'default'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
print ( f "Loaded { len (df) } customers" )
return df
Execute the block to verify the connection.
Extract: Load Data from API
Next, fetch order data from a REST API.
Add another Data loader block
Select Python > API
Name it load_orders
import io
import pandas as pd
import requests
from datetime import datetime, timedelta
if 'data_loader' not in globals ():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data_from_api ( * args , ** kwargs ):
"""
Load order data from API with pagination
"""
base_url = kwargs.get( 'api_base_url' , 'https://api.example.com' )
api_key = kwargs.get( 'api_key' ) # Pass via runtime variables
headers = {
'Authorization' : f 'Bearer { api_key } ' ,
'Content-Type' : 'application/json'
}
# Fetch orders from last 30 days
start_date = (datetime.now() - timedelta( days = 30 )).strftime( '%Y-%m- %d ' )
all_orders = []
page = 1
has_more = True
while has_more:
params = {
'start_date' : start_date,
'page' : page,
'per_page' : 100
}
response = requests.get(
f ' { base_url } /orders' ,
headers = headers,
params = params,
timeout = 30
)
response.raise_for_status()
data = response.json()
orders = data.get( 'orders' , [])
all_orders.extend(orders)
has_more = len (orders) == 100
page += 1
print ( f "Fetched page { page - 1 } : { len (orders) } orders" )
df = pd.DataFrame(all_orders)
print ( f "Loaded { len (df) } total orders" )
return df
Use Runtime variables to pass API keys and configuration at runtime instead of hardcoding them.
Extract: Load Data from CSV
Load product catalog from a CSV file.
Add a Data loader block
Select Python > Local file
Name it load_products
import pandas as pd
from mage_ai.settings.repo import get_repo_path
from os import path
if 'data_loader' not in globals ():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data_from_file ( * args , ** kwargs ):
"""
Load product catalog from CSV
"""
filepath = path.join(
get_repo_path(),
'data' ,
'products.csv'
)
df = pd.read_csv(
filepath,
dtype = {
'product_id' : 'int64' ,
'sku' : 'string' ,
'name' : 'string' ,
'category' : 'string' ,
'price' : 'float64'
}
)
print ( f "Loaded { len (df) } products" )
return df
Transform: Join Datasets
Now create a transformer to join all three datasets.
Add a Transformer block
Name it join_data
Connect it to all three data loaders as upstream blocks
import pandas as pd
if 'transformer' not in globals ():
from mage_ai.data_preparation.decorators import transformer
@transformer
def transform ( customers , orders , products , * args , ** kwargs ):
"""
Join customers, orders, and products data
Args:
customers: DataFrame from load_customers
orders: DataFrame from load_orders
products: DataFrame from load_products
"""
# Join orders with customers
df = orders.merge(
customers,
on = 'customer_id' ,
how = 'inner'
)
print ( f "After customer join: { len (df) } rows" )
# Join with products
df = df.merge(
products,
on = 'product_id' ,
how = 'left'
)
print ( f "After product join: { len (df) } rows" )
# Calculate order totals
df[ 'order_total' ] = df[ 'quantity' ] * df[ 'price' ]
# Add derived fields
df[ 'order_date' ] = pd.to_datetime(df[ 'order_date' ])
df[ 'order_month' ] = df[ 'order_date' ].dt.to_period( 'M' )
df[ 'customer_full_name' ] = df[ 'first_name' ] + ' ' + df[ 'last_name' ]
print ( f "Final dataset: { len (df) } rows, { len (df.columns) } columns" )
return df
When a transformer has multiple upstream blocks, each block’s output is passed as a separate parameter in order.
Transform: Data Quality Checks
Add data validation to ensure quality.
Add another Transformer block
Name it validate_and_clean
Connect it to the join_data block
import pandas as pd
import numpy as np
if 'transformer' not in globals ():
from mage_ai.data_preparation.decorators import transformer
@transformer
def validate_and_clean ( data , * args , ** kwargs ):
"""
Perform data quality checks and cleaning
"""
df = data.copy()
initial_count = len (df)
# 1. Remove duplicates
df = df.drop_duplicates( subset = [ 'order_id' ])
print ( f "Removed { initial_count - len (df) } duplicate orders" )
# 2. Handle missing values
# Fill missing product names with 'Unknown'
df[ 'name' ] = df[ 'name' ].fillna( 'Unknown Product' )
# Drop rows with missing critical fields
df = df.dropna( subset = [ 'customer_id' , 'order_id' , 'order_total' ])
# 3. Validate data ranges
# Remove negative order totals
invalid_totals = df[ 'order_total' ] < 0
if invalid_totals.any():
print ( f "Warning: Found { invalid_totals.sum() } orders with negative totals" )
df = df[ ~ invalid_totals]
# 4. Standardize data
# Convert email to lowercase
df[ 'email' ] = df[ 'email' ].str.lower().str.strip()
# Standardize country codes
country_mapping = {
'USA' : 'US' ,
'United States' : 'US' ,
'UK' : 'GB' ,
'United Kingdom' : 'GB'
}
df[ 'country' ] = df[ 'country' ].replace(country_mapping)
# 5. Add quality flags
df[ 'has_product_info' ] = df[ 'product_id' ].notna()
df[ 'is_high_value' ] = df[ 'order_total' ] >= 100
print ( f "Data quality summary:" )
print ( f " Total rows: { len (df) } " )
print ( f " Orders with product info: { df[ 'has_product_info' ].sum() } " )
print ( f " High-value orders: { df[ 'is_high_value' ].sum() } " )
print ( f " Unique customers: { df[ 'customer_id' ].nunique() } " )
return df
Add Quality Tests Add tests to the transformer: from mage_ai.data_preparation.decorators import test
@test
def test_no_nulls_in_critical_fields ( output , * args ):
"""
Ensure no nulls in critical fields
"""
critical_fields = [ 'customer_id' , 'order_id' , 'order_total' , 'email' ]
for field in critical_fields:
assert output[field].notna().all(), f 'Null values found in { field } '
@test
def test_valid_order_totals ( output , * args ):
"""
Ensure all order totals are positive
"""
assert (output[ 'order_total' ] >= 0 ).all(), 'Negative order totals found'
@test
def test_valid_emails ( output , * args ):
"""
Ensure emails contain @ symbol
"""
assert output[ 'email' ].str.contains( '@' ).all(), 'Invalid email addresses found'
Transform: Aggregations
Create customer-level aggregations for analytics.
Add a Transformer block
Name it aggregate_metrics
import pandas as pd
if 'transformer' not in globals ():
from mage_ai.data_preparation.decorators import transformer
@transformer
def aggregate_customer_metrics ( data , * args , ** kwargs ):
"""
Calculate customer-level metrics
"""
# Customer aggregations
customer_metrics = data.groupby( 'customer_id' ).agg({
'order_id' : 'count' ,
'order_total' : [ 'sum' , 'mean' , 'max' ],
'order_date' : [ 'min' , 'max' ],
'customer_full_name' : 'first' ,
'email' : 'first' ,
'country' : 'first'
}).reset_index()
# Flatten column names
customer_metrics.columns = [
'customer_id' ,
'total_orders' ,
'total_spent' ,
'avg_order_value' ,
'max_order_value' ,
'first_order_date' ,
'last_order_date' ,
'customer_name' ,
'email' ,
'country'
]
# Calculate customer lifetime
customer_metrics[ 'customer_lifetime_days' ] = (
customer_metrics[ 'last_order_date' ] -
customer_metrics[ 'first_order_date' ]
).dt.days
# Segment customers
def segment_customer ( row ):
if row[ 'total_orders' ] >= 10 and row[ 'total_spent' ] >= 1000 :
return 'VIP'
elif row[ 'total_orders' ] >= 5 or row[ 'total_spent' ] >= 500 :
return 'Loyal'
else :
return 'Regular'
customer_metrics[ 'customer_segment' ] = customer_metrics.apply(
segment_customer,
axis = 1
)
print ( f "Customer segments:" )
print (customer_metrics[ 'customer_segment' ].value_counts())
return customer_metrics
Load: Export to Data Warehouse
Finally, export the transformed data to a data warehouse. Option 1: Export to PostgreSQL from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals ():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_to_postgres ( df : DataFrame, ** kwargs ) -> None :
"""
Export data to PostgreSQL data warehouse
"""
schema_name = 'analytics'
table_name = 'customer_metrics'
config_path = path.join(get_repo_path(), 'io_config.yaml' )
config_profile = 'default'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index = False ,
if_exists = 'replace' , # Use 'append' for incremental loads
)
print ( f "Exported { len (df) } rows to { schema_name } . { table_name } " )
Option 2: Export to Snowflake from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals ():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_to_snowflake ( df : DataFrame, ** kwargs ) -> None :
"""
Export data to Snowflake
"""
table_name = 'CUSTOMER_METRICS'
database = 'ANALYTICS_DB'
schema = 'PUBLIC'
config_path = path.join(get_repo_path(), 'io_config.yaml' )
config_profile = 'default'
with Snowflake.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
table_name = table_name,
database = database,
schema = schema,
if_exists = 'replace' ,
)
print ( f "Exported { len (df) } rows to { database } . { schema } . { table_name } " )
Add Error Handling
Implement robust error handling and monitoring. Create a Custom block named monitor_pipeline: import traceback
from datetime import datetime
if 'custom' not in globals ():
from mage_ai.data_preparation.decorators import custom
@custom
def monitor_pipeline ( * args , ** kwargs ):
"""
Monitor pipeline execution and log metrics
"""
pipeline_run_id = kwargs.get( 'pipeline_run_id' )
execution_date = kwargs.get( 'execution_date' , datetime.now())
metrics = {
'pipeline_run_id' : pipeline_run_id,
'execution_date' : execution_date.isoformat(),
'status' : 'success' ,
'timestamp' : datetime.now().isoformat()
}
# Log metrics (integrate with your monitoring system)
print ( f "Pipeline metrics: { metrics } " )
return metrics
Add a Callback block for failure handling: from mage_ai.data_preparation.decorators import callback
@callback ( 'failure' )
def handle_failure ( parent_block_data , ** kwargs ):
"""
Handle pipeline failures
"""
error_message = kwargs.get( 'error' , 'Unknown error' )
pipeline_uuid = kwargs.get( 'pipeline_uuid' )
print ( f "Pipeline { pipeline_uuid } failed: { error_message } " )
# Send alert (integrate with your alerting system)
# send_alert(pipeline_uuid, error_message)
return { 'status' : 'failure_handled' }
Pipeline Configuration
Configure your pipeline settings in the pipeline editor:
blocks :
- all_upstream_blocks_executed : true
color : null
configuration : {}
downstream_blocks :
- join_data
executor_config : null
executor_type : local_python
has_callback : false
language : python
name : load_customers
retry_config : null
status : executed
timeout : null
type : data_loader
upstream_blocks : []
uuid : load_customers
# ... more blocks
name : customer_order_etl
type : python
Enable concurrent execution for independent blocks: # In io_config.yaml
executor_type: local_python
executor_config:
max_cpu: 4
Data loaders that don’t depend on each other will run in parallel.
Implement Incremental Loading
Load only new/changed data: @data_loader
def load_incremental_data ( * args , ** kwargs ):
# Get last execution date
last_run = kwargs.get( 'last_run_date' , '2024-01-01' )
query = f """
SELECT * FROM orders
WHERE updated_at > ' { last_run } '
"""
return load_data(query)
Process large datasets in chunks: @transformer
def process_in_chunks ( data , * args , ** kwargs ):
chunk_size = 10000
results = []
for chunk in pd.read_sql(query, conn, chunksize = chunk_size):
processed = transform_chunk(chunk)
results.append(processed)
return pd.concat(results)
Monitoring and Observability
Track pipeline health and performance:
View Pipeline Runs
Click Pipeline runs in the sidebar
View execution history and status
Click any run to see detailed logs
Set Up Alerts
Configure alerts for failures:
notification_config :
alert_on :
- trigger_failure
- trigger_passed_sla
slack :
webhook_url : "{{ env_var('SLACK_WEBHOOK_URL') }}"
email :
to_emails :
- [email protected]
Next Steps
Streaming Pipeline Process real-time data with Kafka and streaming pipelines
DBT Integration Add dbt transformations to your ETL workflow
Production Deployment Deploy your pipeline to production
Backfills Run historical data backfills