Skip to main content

Overview

In this tutorial, you’ll build a complete ETL (Extract, Transform, Load) workflow that:
  • Extracts data from multiple sources (API, database, CSV)
  • Performs complex transformations and data quality checks
  • Loads data into a data warehouse
  • Implements error handling and monitoring
This tutorial builds on the “Building Your First Pipeline” guide. Complete that first if you’re new to Mage.

What You’ll Build

A production-ready ETL pipeline that:
  1. Loads customer data from PostgreSQL
  2. Fetches order data from a REST API
  3. Reads product catalog from CSV
  4. Joins and transforms the datasets
  5. Performs data quality validation
  6. Exports to a data warehouse

1

Create an ETL Pipeline

Start by creating a new batch pipeline:
  1. Navigate to Pipelines in the Mage UI
  2. Click + New pipeline
  3. Select Standard (batch)
  4. Name it customer_order_etl
  5. Add a description: “ETL pipeline for customer order analytics”
2

Extract: Load Data from PostgreSQL

First, load customer data from your PostgreSQL database.

Configure Database Connection

Edit io_config.yaml in your project root:
io_config.yaml
default:
  POSTGRES_CONNECT_TIMEOUT: 10
  POSTGRES_DBNAME: your_database
  POSTGRES_HOST: localhost
  POSTGRES_PASSWORD: your_password
  POSTGRES_PORT: 5432
  POSTGRES_USER: your_user
For production, use environment variables instead of hardcoding credentials:
POSTGRES_PASSWORD: "{{ env_var('DB_PASSWORD') }}"

Create the Data Loader

  1. Add a Data loader block
  2. Select Python > PostgreSQL
  3. Name it load_customers
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

@data_loader
def load_data_from_postgres(*args, **kwargs):
    """
    Load customer data from PostgreSQL
    """
    query = """
        SELECT 
            customer_id,
            first_name,
            last_name,
            email,
            created_at,
            country,
            is_active
        FROM customers
        WHERE is_active = true
          AND created_at >= CURRENT_DATE - INTERVAL '1 year'
    """
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        df = loader.load(query)
    
    print(f"Loaded {len(df)} customers")
    return df
Execute the block to verify the connection.
3

Extract: Load Data from API

Next, fetch order data from a REST API.
  1. Add another Data loader block
  2. Select Python > API
  3. Name it load_orders
import io
import pandas as pd
import requests
from datetime import datetime, timedelta

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Load order data from API with pagination
    """
    base_url = kwargs.get('api_base_url', 'https://api.example.com')
    api_key = kwargs.get('api_key')  # Pass via runtime variables
    
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json'
    }
    
    # Fetch orders from last 30 days
    start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
    
    all_orders = []
    page = 1
    has_more = True
    
    while has_more:
        params = {
            'start_date': start_date,
            'page': page,
            'per_page': 100
        }
        
        response = requests.get(
            f'{base_url}/orders',
            headers=headers,
            params=params,
            timeout=30
        )
        response.raise_for_status()
        
        data = response.json()
        orders = data.get('orders', [])
        all_orders.extend(orders)
        
        has_more = len(orders) == 100
        page += 1
        
        print(f"Fetched page {page-1}: {len(orders)} orders")
    
    df = pd.DataFrame(all_orders)
    print(f"Loaded {len(df)} total orders")
    
    return df
Use Runtime variables to pass API keys and configuration at runtime instead of hardcoding them.
4

Extract: Load Data from CSV

Load product catalog from a CSV file.
  1. Add a Data loader block
  2. Select Python > Local file
  3. Name it load_products
import pandas as pd
from mage_ai.settings.repo import get_repo_path
from os import path

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

@data_loader
def load_data_from_file(*args, **kwargs):
    """
    Load product catalog from CSV
    """
    filepath = path.join(
        get_repo_path(),
        'data',
        'products.csv'
    )
    
    df = pd.read_csv(
        filepath,
        dtype={
            'product_id': 'int64',
            'sku': 'string',
            'name': 'string',
            'category': 'string',
            'price': 'float64'
        }
    )
    
    print(f"Loaded {len(df)} products")
    return df
5

Transform: Join Datasets

Now create a transformer to join all three datasets.
  1. Add a Transformer block
  2. Name it join_data
  3. Connect it to all three data loaders as upstream blocks
import pandas as pd

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def transform(customers, orders, products, *args, **kwargs):
    """
    Join customers, orders, and products data
    
    Args:
        customers: DataFrame from load_customers
        orders: DataFrame from load_orders
        products: DataFrame from load_products
    """
    # Join orders with customers
    df = orders.merge(
        customers,
        on='customer_id',
        how='inner'
    )
    
    print(f"After customer join: {len(df)} rows")
    
    # Join with products
    df = df.merge(
        products,
        on='product_id',
        how='left'
    )
    
    print(f"After product join: {len(df)} rows")
    
    # Calculate order totals
    df['order_total'] = df['quantity'] * df['price']
    
    # Add derived fields
    df['order_date'] = pd.to_datetime(df['order_date'])
    df['order_month'] = df['order_date'].dt.to_period('M')
    df['customer_full_name'] = df['first_name'] + ' ' + df['last_name']
    
    print(f"Final dataset: {len(df)} rows, {len(df.columns)} columns")
    
    return df
When a transformer has multiple upstream blocks, each block’s output is passed as a separate parameter in order.
6

Transform: Data Quality Checks

Add data validation to ensure quality.
  1. Add another Transformer block
  2. Name it validate_and_clean
  3. Connect it to the join_data block
import pandas as pd
import numpy as np

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def validate_and_clean(data, *args, **kwargs):
    """
    Perform data quality checks and cleaning
    """
    df = data.copy()
    initial_count = len(df)
    
    # 1. Remove duplicates
    df = df.drop_duplicates(subset=['order_id'])
    print(f"Removed {initial_count - len(df)} duplicate orders")
    
    # 2. Handle missing values
    # Fill missing product names with 'Unknown'
    df['name'] = df['name'].fillna('Unknown Product')
    
    # Drop rows with missing critical fields
    df = df.dropna(subset=['customer_id', 'order_id', 'order_total'])
    
    # 3. Validate data ranges
    # Remove negative order totals
    invalid_totals = df['order_total'] < 0
    if invalid_totals.any():
        print(f"Warning: Found {invalid_totals.sum()} orders with negative totals")
        df = df[~invalid_totals]
    
    # 4. Standardize data
    # Convert email to lowercase
    df['email'] = df['email'].str.lower().str.strip()
    
    # Standardize country codes
    country_mapping = {
        'USA': 'US',
        'United States': 'US',
        'UK': 'GB',
        'United Kingdom': 'GB'
    }
    df['country'] = df['country'].replace(country_mapping)
    
    # 5. Add quality flags
    df['has_product_info'] = df['product_id'].notna()
    df['is_high_value'] = df['order_total'] >= 100
    
    print(f"Data quality summary:")
    print(f"  Total rows: {len(df)}")
    print(f"  Orders with product info: {df['has_product_info'].sum()}")
    print(f"  High-value orders: {df['is_high_value'].sum()}")
    print(f"  Unique customers: {df['customer_id'].nunique()}")
    
    return df

Add Quality Tests

Add tests to the transformer:
from mage_ai.data_preparation.decorators import test

@test
def test_no_nulls_in_critical_fields(output, *args):
    """
    Ensure no nulls in critical fields
    """
    critical_fields = ['customer_id', 'order_id', 'order_total', 'email']
    for field in critical_fields:
        assert output[field].notna().all(), f'Null values found in {field}'

@test
def test_valid_order_totals(output, *args):
    """
    Ensure all order totals are positive
    """
    assert (output['order_total'] >= 0).all(), 'Negative order totals found'

@test
def test_valid_emails(output, *args):
    """
    Ensure emails contain @ symbol
    """
    assert output['email'].str.contains('@').all(), 'Invalid email addresses found'
7

Transform: Aggregations

Create customer-level aggregations for analytics.
  1. Add a Transformer block
  2. Name it aggregate_metrics
import pandas as pd

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def aggregate_customer_metrics(data, *args, **kwargs):
    """
    Calculate customer-level metrics
    """
    # Customer aggregations
    customer_metrics = data.groupby('customer_id').agg({
        'order_id': 'count',
        'order_total': ['sum', 'mean', 'max'],
        'order_date': ['min', 'max'],
        'customer_full_name': 'first',
        'email': 'first',
        'country': 'first'
    }).reset_index()
    
    # Flatten column names
    customer_metrics.columns = [
        'customer_id',
        'total_orders',
        'total_spent',
        'avg_order_value',
        'max_order_value',
        'first_order_date',
        'last_order_date',
        'customer_name',
        'email',
        'country'
    ]
    
    # Calculate customer lifetime
    customer_metrics['customer_lifetime_days'] = (
        customer_metrics['last_order_date'] - 
        customer_metrics['first_order_date']
    ).dt.days
    
    # Segment customers
    def segment_customer(row):
        if row['total_orders'] >= 10 and row['total_spent'] >= 1000:
            return 'VIP'
        elif row['total_orders'] >= 5 or row['total_spent'] >= 500:
            return 'Loyal'
        else:
            return 'Regular'
    
    customer_metrics['customer_segment'] = customer_metrics.apply(
        segment_customer,
        axis=1
    )
    
    print(f"Customer segments:")
    print(customer_metrics['customer_segment'].value_counts())
    
    return customer_metrics
8

Load: Export to Data Warehouse

Finally, export the transformed data to a data warehouse.

Option 1: Export to PostgreSQL

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_to_postgres(df: DataFrame, **kwargs) -> None:
    """
    Export data to PostgreSQL data warehouse
    """
    schema_name = 'analytics'
    table_name = 'customer_metrics'
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        loader.export(
            df,
            schema_name,
            table_name,
            index=False,
            if_exists='replace',  # Use 'append' for incremental loads
        )
    
    print(f"Exported {len(df)} rows to {schema_name}.{table_name}")

Option 2: Export to Snowflake

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_to_snowflake(df: DataFrame, **kwargs) -> None:
    """
    Export data to Snowflake
    """
    table_name = 'CUSTOMER_METRICS'
    database = 'ANALYTICS_DB'
    schema = 'PUBLIC'
    
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    with Snowflake.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        loader.export(
            df,
            table_name=table_name,
            database=database,
            schema=schema,
            if_exists='replace',
        )
    
    print(f"Exported {len(df)} rows to {database}.{schema}.{table_name}")
9

Add Error Handling

Implement robust error handling and monitoring.Create a Custom block named monitor_pipeline:
import traceback
from datetime import datetime

if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom

@custom
def monitor_pipeline(*args, **kwargs):
    """
    Monitor pipeline execution and log metrics
    """
    pipeline_run_id = kwargs.get('pipeline_run_id')
    execution_date = kwargs.get('execution_date', datetime.now())
    
    metrics = {
        'pipeline_run_id': pipeline_run_id,
        'execution_date': execution_date.isoformat(),
        'status': 'success',
        'timestamp': datetime.now().isoformat()
    }
    
    # Log metrics (integrate with your monitoring system)
    print(f"Pipeline metrics: {metrics}")
    
    return metrics
Add a Callback block for failure handling:
from mage_ai.data_preparation.decorators import callback

@callback('failure')
def handle_failure(parent_block_data, **kwargs):
    """
    Handle pipeline failures
    """
    error_message = kwargs.get('error', 'Unknown error')
    pipeline_uuid = kwargs.get('pipeline_uuid')
    
    print(f"Pipeline {pipeline_uuid} failed: {error_message}")
    
    # Send alert (integrate with your alerting system)
    # send_alert(pipeline_uuid, error_message)
    
    return {'status': 'failure_handled'}

Pipeline Configuration

Configure your pipeline settings in the pipeline editor:
metadata.yaml
blocks:
- all_upstream_blocks_executed: true
  color: null
  configuration: {}
  downstream_blocks:
  - join_data
  executor_config: null
  executor_type: local_python
  has_callback: false
  language: python
  name: load_customers
  retry_config: null
  status: executed
  timeout: null
  type: data_loader
  upstream_blocks: []
  uuid: load_customers

# ... more blocks

name: customer_order_etl
type: python

Optimizing Performance

Enable concurrent execution for independent blocks:
# In io_config.yaml
executor_type: local_python
executor_config:
  max_cpu: 4
Data loaders that don’t depend on each other will run in parallel.
Load only new/changed data:
@data_loader
def load_incremental_data(*args, **kwargs):
    # Get last execution date
    last_run = kwargs.get('last_run_date', '2024-01-01')
    
    query = f"""
        SELECT * FROM orders
        WHERE updated_at > '{last_run}'
    """
    return load_data(query)
Process large datasets in chunks:
@transformer
def process_in_chunks(data, *args, **kwargs):
    chunk_size = 10000
    results = []
    
    for chunk in pd.read_sql(query, conn, chunksize=chunk_size):
        processed = transform_chunk(chunk)
        results.append(processed)
    
    return pd.concat(results)

Monitoring and Observability

Track pipeline health and performance:

View Pipeline Runs

  1. Click Pipeline runs in the sidebar
  2. View execution history and status
  3. Click any run to see detailed logs

Set Up Alerts

Configure alerts for failures:
metadata.yaml
notification_config:
  alert_on:
    - trigger_failure
    - trigger_passed_sla
  slack:
    webhook_url: "{{ env_var('SLACK_WEBHOOK_URL') }}"
  email:
    to_emails:
      - [email protected]

Next Steps

Streaming Pipeline

Process real-time data with Kafka and streaming pipelines

DBT Integration

Add dbt transformations to your ETL workflow

Production Deployment

Deploy your pipeline to production

Backfills

Run historical data backfills

Build docs developers (and LLMs) love