Build and manage data transformation pipelines with SQLMesh and MotherDuck
This demo shows how to build data transformation pipelines using SQLMesh with MotherDuck as the execution engine. The project extends a stocks demo to showcase SQLMesh features including model dependencies, incremental transformations, and data quality audits.
The pipeline uses dlt to load data from Yahoo Finance into MotherDuck:
import dltimport yfinance as yffrom typing import Iterator, Dict, Any@dlt.resource( primary_key="Symbol", write_disposition="replace", table_name="stock_info")def stock_info_resource() -> Iterator[Dict[str, Any]]: """Resource that fetches stock information for each symbol.""" for symbol in read_symbols(): try: stock = yf.Ticker(symbol) if not stock.history(period="1d").empty: info = stock.info info["Symbol"] = symbol yield info else: print(f"Invalid symbol: {symbol}") except Exception as e: print(f"Error processing symbol {symbol}: {e}")def load_all_stock_data(): """Creates and runs the combined stock data pipeline.""" pipeline = dlt.pipeline( pipeline_name="stock_data", destination="motherduck", dataset_name="stock_data" ) # Load all three types of data info_load = pipeline.run(stock_info_resource()) options_load = pipeline.run(stock_options_resource()) history_load = pipeline.run(stock_history_resource())
View complete data pipeline code
import dltimport yfinance as yfimport pandas as pdfrom datetime import datetime, timedeltafrom typing import Iterator, Dict, Anyimport osdef read_symbols() -> Iterator[str]: """Generator that reads and yields stock symbols from a file.""" file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "symbols.txt") try: with open(file_path, "r") as file: for symbol in file: symbol = symbol.strip() if symbol: yield symbol except FileNotFoundError: print(f"File {file_path} not found.")@dlt.resource( primary_key="Symbol", write_disposition="replace", table_name="stock_info")def stock_info_resource() -> Iterator[Dict[str, Any]]: """Resource that fetches stock information for each symbol.""" for symbol in read_symbols(): try: stock = yf.Ticker(symbol) if not stock.history(period="1d").empty: info = stock.info info["Symbol"] = symbol yield info else: print(f"Invalid symbol: {symbol}") except Exception as e: print(f"Error processing symbol {symbol}: {e}")@dlt.resource( primary_key=["Symbol", "Date"], write_disposition="replace", table_name="stock_history",)def stock_history_resource() -> Iterator[Dict[str, Any]]: """Resource that fetches historical stock data for each symbol.""" end_date = datetime.now().strftime("%Y-%m-%d") start_date = (datetime.now() - timedelta(days=360)).strftime("%Y-%m-%d") for symbol in read_symbols(): if validate_symbol(symbol): print(f"Fetching data for: {symbol}") try: stock_data = yf.download(symbol, start=start_date, end=end_date) records = [] for date, row in stock_data.iterrows(): record = { "Date": date.strftime("%Y-%m-%d"), "Open": float(row["Open"].iloc[0]) if isinstance(row["Open"], pd.Series) else float(row["Open"]), "High": float(row["High"].iloc[0]) if isinstance(row["High"], pd.Series) else float(row["High"]), "Low": float(row["Low"].iloc[0]) if isinstance(row["Low"], pd.Series) else float(row["Low"]), "Close": float(row["Close"].iloc[0]) if isinstance(row["Close"], pd.Series) else float(row["Close"]), "Volume": int(row["Volume"].iloc[0]) if isinstance(row["Volume"], pd.Series) else int(row["Volume"]), "Symbol": symbol, } records.append(record) for record in records: yield record except Exception as e: print(f"Error fetching data for {symbol}: {e}") else: print(f"Invalid symbol: {symbol}")def load_all_stock_data(): """Creates and runs the combined stock data pipeline.""" pipeline = dlt.pipeline( pipeline_name="stock_data", destination="motherduck", dataset_name="stock_data" ) info_load = pipeline.run(stock_info_resource()) print("Stock Info Load Results:") print(info_load) options_load = pipeline.run(stock_options_resource()) print("\nStock Options Load Results:") print(options_load) history_load = pipeline.run(stock_history_resource()) print("\nStock History Load Results:") print(history_load)if __name__ == "__main__": load_all_stock_data()
SQLMesh uses the motherduck connection type, which leverages the DuckDB dialect. Make sure your MOTHERDUCK_TOKEN environment variable is set for authentication.
export MOTHERDUCK_TOKEN="your_token_here"source ~/.bashrc # or ~/.zshrc
Verify the token is set:
echo $MOTHERDUCK_TOKEN
dlt pipeline fails
If the data loading pipeline fails:
Check your .dlt/secrets.toml configuration
Verify your MotherDuck token is valid
Ensure you have network connectivity
Check the symbols.txt file contains valid stock symbols
Model audits failing
If data quality audits fail during sqlmesh plan:
Review the specific audit that failed
Query the source data to understand the issue
Either fix the data or adjust the audit rules
Re-run sqlmesh plan after corrections
The stock market data from Yahoo Finance may have rate limits. If you’re loading data for many symbols, consider adding delays or batching the requests.