Skip to main content
This demo shows how to build data transformation pipelines using SQLMesh with MotherDuck as the execution engine. The project extends a stocks demo to showcase SQLMesh features including model dependencies, incremental transformations, and data quality audits.

Overview

This integration demonstrates:
  • Loading stock market data into MotherDuck using dlt (data load tool)
  • Transforming raw data through interim and conformed layers
  • Building mart models for analytics
  • Running data quality audits
  • Managing model versions and deployments with SQLMesh

Prerequisites

1

Python environment

Install uv for Python environment management:
2

MotherDuck credentials

You’ll need:
  • A MotherDuck database
  • A MotherDuck token
3

Clone the repository

Clone the demo repository to get started.

Getting Started

1

Set up Python environment

Create and activate a virtual environment, then install dependencies:
uv venv
source .venv/bin/activate
uv sync
This installs the required dependencies defined in pyproject.toml:
[project]
name = "sqlmesh-demo"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "dlt[duckdb,motherduck]>=1.4.1",
    "duckdb>=1.3.0",
    "sqlmesh[web]>=0.188.1",
    "yfinance>=0.2.61",
]
2

Configure MotherDuck credentials

Add your MotherDuck database and token to .dlt/secrets.toml:See the dlt MotherDuck setup guide for detailed configuration instructions.
The .dlt/secrets.toml file should contain your MotherDuck token and database name. Keep this file secure and never commit it to version control.
3

Load the data

Run the data pipeline to hydrate MotherDuck with stock data:
python3 load/stock_data_pipeline.py
This script loads three types of data:
  • Stock info (company information)
  • Stock options (options chain data)
  • Stock history (historical price data)
4

Verify SQLMesh setup

Navigate to the transform directory and verify the configuration:
cd transform
sqlmesh info
If SQLMesh cannot find your MotherDuck token, set it as an environment variable:
export MOTHERDUCK_TOKEN="your_token_here"
Alternatively, you can use the MotherDuck Web UI to manage your connection.
5

Run your first transformation

Execute the SQLMesh plan to run transformations:
sqlmesh plan
When prompted, type y to push the changes to MotherDuck.

Data Pipeline Architecture

The pipeline uses dlt to load data from Yahoo Finance into MotherDuck:
import dlt
import yfinance as yf
from typing import Iterator, Dict, Any

@dlt.resource(
    primary_key="Symbol", 
    write_disposition="replace", 
    table_name="stock_info"
)
def stock_info_resource() -> Iterator[Dict[str, Any]]:
    """Resource that fetches stock information for each symbol."""
    for symbol in read_symbols():
        try:
            stock = yf.Ticker(symbol)
            if not stock.history(period="1d").empty:
                info = stock.info
                info["Symbol"] = symbol
                yield info
            else:
                print(f"Invalid symbol: {symbol}")
        except Exception as e:
            print(f"Error processing symbol {symbol}: {e}")

def load_all_stock_data():
    """Creates and runs the combined stock data pipeline."""
    pipeline = dlt.pipeline(
        pipeline_name="stock_data", 
        destination="motherduck", 
        dataset_name="stock_data"
    )
    
    # Load all three types of data
    info_load = pipeline.run(stock_info_resource())
    options_load = pipeline.run(stock_options_resource())
    history_load = pipeline.run(stock_history_resource())
import dlt
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
from typing import Iterator, Dict, Any
import os


def read_symbols() -> Iterator[str]:
    """Generator that reads and yields stock symbols from a file."""
    file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "symbols.txt")
    try:
        with open(file_path, "r") as file:
            for symbol in file:
                symbol = symbol.strip()
                if symbol:
                    yield symbol
    except FileNotFoundError:
        print(f"File {file_path} not found.")


@dlt.resource(
    primary_key="Symbol", write_disposition="replace", table_name="stock_info"
)
def stock_info_resource() -> Iterator[Dict[str, Any]]:
    """Resource that fetches stock information for each symbol."""
    for symbol in read_symbols():
        try:
            stock = yf.Ticker(symbol)
            if not stock.history(period="1d").empty:
                info = stock.info
                info["Symbol"] = symbol
                yield info
            else:
                print(f"Invalid symbol: {symbol}")
        except Exception as e:
            print(f"Error processing symbol {symbol}: {e}")


@dlt.resource(
    primary_key=["Symbol", "Date"],
    write_disposition="replace",
    table_name="stock_history",
)
def stock_history_resource() -> Iterator[Dict[str, Any]]:
    """Resource that fetches historical stock data for each symbol."""
    end_date = datetime.now().strftime("%Y-%m-%d")
    start_date = (datetime.now() - timedelta(days=360)).strftime("%Y-%m-%d")
    
    for symbol in read_symbols():
        if validate_symbol(symbol):
            print(f"Fetching data for: {symbol}")
            try:
                stock_data = yf.download(symbol, start=start_date, end=end_date)
                
                records = []
                for date, row in stock_data.iterrows():
                    record = {
                        "Date": date.strftime("%Y-%m-%d"),
                        "Open": float(row["Open"].iloc[0])
                        if isinstance(row["Open"], pd.Series)
                        else float(row["Open"]),
                        "High": float(row["High"].iloc[0])
                        if isinstance(row["High"], pd.Series)
                        else float(row["High"]),
                        "Low": float(row["Low"].iloc[0])
                        if isinstance(row["Low"], pd.Series)
                        else float(row["Low"]),
                        "Close": float(row["Close"].iloc[0])
                        if isinstance(row["Close"], pd.Series)
                        else float(row["Close"]),
                        "Volume": int(row["Volume"].iloc[0])
                        if isinstance(row["Volume"], pd.Series)
                        else int(row["Volume"]),
                        "Symbol": symbol,
                    }
                    records.append(record)
                
                for record in records:
                    yield record
                    
            except Exception as e:
                print(f"Error fetching data for {symbol}: {e}")
        else:
            print(f"Invalid symbol: {symbol}")


def load_all_stock_data():
    """Creates and runs the combined stock data pipeline."""
    pipeline = dlt.pipeline(
        pipeline_name="stock_data", destination="motherduck", dataset_name="stock_data"
    )
    
    info_load = pipeline.run(stock_info_resource())
    print("Stock Info Load Results:")
    print(info_load)
    
    options_load = pipeline.run(stock_options_resource())
    print("\nStock Options Load Results:")
    print(options_load)
    
    history_load = pipeline.run(stock_history_resource())
    print("\nStock History Load Results:")
    print(history_load)


if __name__ == "__main__":
    load_all_stock_data()

SQLMesh Configuration

The SQLMesh project is configured to use MotherDuck as the execution engine:
gateways:
  local:
    connection:
      type: motherduck
      database: dlt_test_db

default_gateway: local

model_defaults:
  dialect: duckdb
  start: 2024-12-08
SQLMesh uses the motherduck connection type, which leverages the DuckDB dialect. Make sure your MOTHERDUCK_TOKEN environment variable is set for authentication.

Model Architecture

The transformation pipeline is organized into three layers:

1. Interim Layer

Raw data from dlt is processed into interim models:
  • interim.stock_info - Company information
  • interim.stock_history - Historical price data
  • interim.stock_options - Options chain data
  • interim._dlt_loads - dlt metadata

2. Conformed Layer

Interim models are standardized and cleaned: conformed.company_info
MODEL (
  name conformed.company_info,
  kind FULL,
  grain (symbol),
  audits (
    UNIQUE_VALUES(columns = (symbol)), 
    NOT_NULL(columns = (symbol))
  ),
  cron '@daily'
);

SELECT *
FROM interim.stock_info
WHERE valid_to IS NULL
conformed.price_history
MODEL (
  name conformed.price_history,
  kind VIEW,
  cron '@daily'
);

SELECT *
FROM interim.stock_history

3. Mart Layer

Business-ready models for analytics: mart.stock_price_by_day
MODEL (
  name mart.stock_price_by_day,
  kind VIEW,
  grain (trade_date, stock_symbol),
  audits (
    UNIQUE_COMBINATION_OF_COLUMNS(columns := (trade_date, stock_symbol)),
    NOT_NULL(columns := (trade_date, stock_symbol))
  )
);

SELECT
  c.symbol AS stock_symbol,
  c.shares_outstanding,
  sp.close,
  sp.trade_date,
  ROUND(c.shares_outstanding::REAL * sp.close::REAL, 0) AS market_cap
FROM conformed.company_info AS c
LEFT JOIN conformed.price_history AS sp
  ON c.symbol = sp.symbol
ORDER BY
  c.symbol,
  sp.trade_date
The grain attribute in SQLMesh models defines the unique key(s) for the model, while audits specify data quality checks that run automatically.

Data Quality Audits

SQLMesh includes built-in data quality checks:
  • UNIQUE_VALUES: Ensures column values are unique
  • NOT_NULL: Validates required fields are populated
  • UNIQUE_COMBINATION_OF_COLUMNS: Checks composite key uniqueness
These audits run automatically during sqlmesh plan execution and will fail the plan if checks don’t pass.

Common SQLMesh Commands

sqlmesh info

Project Structure

sqlmesh-demo/
├── load/
│   ├── stock_data_pipeline.py  # dlt pipeline for data loading
│   └── symbols.txt             # List of stock symbols
├── transform/
│   ├── config.yaml             # SQLMesh configuration
│   ├── models/
│   │   ├── interim/           # Raw to interim transformations
│   │   ├── conformed/         # Standardized models
│   │   └── mart/              # Business analytics models
│   ├── audits/                # Custom audit definitions
│   ├── macros/                # SQL macros
│   ├── seeds/                 # Static data files
│   └── tests/                 # Model tests
└── pyproject.toml             # Python dependencies

Best Practices

1

Start with data loading

Always run the dlt pipeline first to ensure data is available in MotherDuck before running SQLMesh transformations.
2

Use incremental models

For large datasets, consider using SQLMesh’s incremental model kinds (INCREMENTAL_BY_TIME_RANGE or INCREMENTAL_BY_UNIQUE_KEY) to improve performance.
3

Define clear model grains

Always specify the grain in your model definitions to document the unique key(s) and enable automatic deduplication.
4

Add data quality checks

Use SQLMesh audits to catch data quality issues early in the transformation pipeline.
5

Test before deploying

Run sqlmesh plan to preview changes before applying them to production.

Troubleshooting

If SQLMesh cannot authenticate with MotherDuck:
export MOTHERDUCK_TOKEN="your_token_here"
source ~/.bashrc  # or ~/.zshrc
Verify the token is set:
echo $MOTHERDUCK_TOKEN
If the data loading pipeline fails:
  1. Check your .dlt/secrets.toml configuration
  2. Verify your MotherDuck token is valid
  3. Ensure you have network connectivity
  4. Check the symbols.txt file contains valid stock symbols
If data quality audits fail during sqlmesh plan:
  1. Review the specific audit that failed
  2. Query the source data to understand the issue
  3. Either fix the data or adjust the audit rules
  4. Re-run sqlmesh plan after corrections
The stock market data from Yahoo Finance may have rate limits. If you’re loading data for many symbols, consider adding delays or batching the requests.

Build docs developers (and LLMs) love