Skip to main content

Dagster Pipelines

Dagster is a data orchestration platform that focuses on assets rather than tasks. It treats data, models, and metrics as first-class citizens with built-in lineage tracking, data quality checks, and observability.

Why Dagster?

Dagster’s asset-centric approach offers unique advantages:
  • Asset-first paradigm: Define what you want to produce, not just tasks
  • Data quality checks: Built-in asset checks for validation and anomaly detection
  • Rich metadata: Attach JSON, markdown, plots to assets for observability
  • Software-defined assets: Assets are code, enabling testing and version control
  • Flexible execution: Run locally, on Modal, Kubernetes, or other executors

Setup

1

Configure Environment

Set up Dagster home directory and credentials:
mkdir ./dagster_pipelines/dagster-home
export DAGSTER_HOME=$PWD/dagster_pipelines/dagster-home
export WANDB_PROJECT=your-project
export WANDB_API_KEY=your-key
2

Install Dependencies

Install Dagster and required packages:
pip install dagster dagster-webserver
pip install datasets transformers peft trl evaluate
pip install modal  # For remote execution
3

Deploy Modal Functions (Optional)

Deploy training functions to Modal for GPU execution:
MODAL_FORCE_BUILD=1 modal deploy ./dagster_pipelines/text2sql_functions.py
4

Start Dagster UI

Launch the Dagster web interface:
dagster dev -f dagster_pipelines/text2sql_pipeline.py -p 3000 -h 0.0.0.0
Access at http://0.0.0.0:3000

Text-to-SQL Pipeline

This example demonstrates fine-tuning a Phi-3 model for text-to-SQL generation with comprehensive data quality checks.

Asset: Load SQL Data

dagster_pipelines/text2sql_pipeline.py
from dagster import asset, AssetExecutionContext, MetadataValue
from datasets import DatasetDict, load_dataset
from random import randint

@asset(group_name="data", compute_kind="python")
def load_sql_data(context: AssetExecutionContext):
    """Load and subsample the SQL context dataset."""
    dataset_name = "b-mc2/sql-create-context"
    dataset = load_dataset(dataset_name, split="train")
    
    # Subsample to 10% for faster training
    subsample = 0.1
    dataset = dataset.shuffle(seed=42).select(
        range(int(len(dataset) * subsample))
    )
    
    # Split into train/test
    datasets = dataset.train_test_split(test_size=0.05, seed=42)
    
    # Log metadata to Dagster UI
    context.add_output_metadata(
        {
            "len_train": MetadataValue.int(len(datasets["train"])),
            "len_test": MetadataValue.int(len(datasets["test"])),
            "sample_train": MetadataValue.json(
                datasets["train"][randint(0, len(datasets["train"]))]
            ),
            "sample_test": MetadataValue.json(
                datasets["test"][randint(0, len(datasets["test"]))]
            ),
        }
    )
    
    return datasets
Key Features:
  • @asset decorator defines a software-defined asset
  • context.add_output_metadata() attaches rich metadata visible in UI
  • Returns DatasetDict passed to downstream assets

Asset Check: No Empty Datasets

from dagster import asset_check, AssetCheckResult

@asset_check(asset=load_sql_data)
def no_empty(load_sql_data):
    """Check that train and test datasets are not empty."""
    train_no_empty = len(load_sql_data["train"]) != 0
    test_no_empty = len(load_sql_data["test"]) != 0
    return AssetCheckResult(passed=train_no_empty and test_no_empty)
Asset checks run automatically when materializing assets:
  • passed=True/False indicates check result
  • Failures don’t stop execution but are highlighted in UI
  • Useful for data quality, schema validation, anomaly detection

Asset: Process Dataset

from functools import partial
from transformers import AutoTokenizer

def create_message_column(row):
    """Format SQL data as chat messages."""
    messages = [
        {"content": f"{row['context']}\n Input: {row['question']}", "role": "user"},
        {"content": f"{row['answer']}", "role": "assistant"},
    ]
    return {"messages": messages}

def format_dataset_chatml(row, tokenizer):
    """Apply chat template to messages."""
    return {
        "text": tokenizer.apply_chat_template(
            row["messages"], add_generation_prompt=False, tokenize=False
        )
    }

@asset(group_name="data", compute_kind="python")
def process_dataset(context: AssetExecutionContext, load_sql_data) -> DatasetDict:
    """Preprocess dataset with chat templates."""
    model_id = "microsoft/Phi-3-mini-4k-instruct"
    dataset = load_sql_data

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.padding_side = "right"

    # Apply transformations
    dataset_chatml = dataset.map(create_message_column)
    dataset_chatml = dataset_chatml.map(
        partial(format_dataset_chatml, tokenizer=tokenizer)
    )

    context.add_output_metadata(
        {
            "len_train": MetadataValue.int(len(dataset_chatml["train"])),
            "len_test": MetadataValue.int(len(dataset_chatml["test"])),
            "sample_train": MetadataValue.json(
                dataset_chatml["train"][randint(0, len(dataset_chatml["train"]))]
            ),
        }
    )

    return dataset_chatml

Asset: Trained Model

import modal

@asset(group_name="model", compute_kind="modal")
def trained_model(process_dataset):
    """Train Phi-3 model on Modal with GPU."""
    # Convert to pandas for serialization to Modal
    process_dataset_pandas = {
        "train": process_dataset["train"].to_pandas(),
        "test": process_dataset["test"].to_pandas(),
    }

    # Call remote Modal function
    model_training_job = modal.Function.lookup(
        "ml-in-production-practice-dagster-pipeline", "training_job"
    )
    model_name, uri = model_training_job.remote(
        dataset_chatml_pandas=process_dataset_pandas
    )

    return model_name
Remote Execution with Modal:
  • compute_kind="modal" indicates execution environment
  • modal.Function.lookup() calls deployed Modal function
  • Training runs on Modal’s GPU infrastructure
  • Returns model name for downstream inference
dagster_pipelines/text2sql_functions.py
import os
import modal
from modal import Image

app = modal.App("ml-in-production-practice-dagster-pipeline")
env = {
    "WANDB_PROJECT": os.getenv("WANDB_PROJECT"),
    "WANDB_API_KEY": os.getenv("WANDB_API_KEY"),
}
custom_image = Image.from_registry(
    "ghcr.io/kyryl-opens-ml/dagster-pipeline:main"
).env(env)
timeout = 10 * 60 * 60  # 10 hours

@app.function(image=custom_image, gpu="a10g", timeout=timeout)
def training_job(dataset_chatml_pandas):
    """Train model on Modal GPU."""
    from datasets import Dataset
    from text2sql_pipeline import train_model

    # Convert back to HuggingFace Dataset
    dataset_chatml = {
        "train": Dataset.from_pandas(dataset_chatml_pandas["train"]),
        "test": Dataset.from_pandas(dataset_chatml_pandas["test"]),
    }
    model_name, uri = train_model(dataset_chatml=dataset_chatml)
    return model_name, uri

@app.function(image=custom_image, gpu="a10g", timeout=timeout)
def evaluation_job(df, model_name):
    """Evaluate model on Modal GPU."""
    from text2sql_pipeline import evaluate_model
    metrics = evaluate_model(df=df, model_name=model_name)
    return metrics

Asset: Model Metrics

import modal

@asset(group_name="model", compute_kind="modal")
def model_metrics(context: AssetExecutionContext, trained_model, process_dataset):
    """Evaluate trained model on test set."""
    # Call remote evaluation job
    model_evaluate_job = modal.Function.lookup(
        "ml-in-production-practice-dagster-pipeline", "evaluation_job"
    )
    metrics = model_evaluate_job.remote(
        df=process_dataset["test"].to_pandas(), 
        model_name=trained_model
    )

    context.add_output_metadata(
        {
            "results": MetadataValue.json(metrics),
        }
    )

    return metrics

Asset Checks: ROUGE Thresholds

@asset_check(asset=model_metrics)
def rouge1_check(model_metrics):
    """Check ROUGE-1 score exceeds threshold."""
    return AssetCheckResult(passed=bool(model_metrics["rouge1"] > 0.8))

@asset_check(asset=model_metrics)
def rouge2_check(model_metrics):
    """Check ROUGE-2 score exceeds threshold."""
    return AssetCheckResult(passed=bool(model_metrics["rouge2"] > 0.8))

@asset_check(asset=model_metrics)
def rougeL_check(model_metrics):
    """Check ROUGE-L score exceeds threshold."""
    return AssetCheckResult(passed=bool(model_metrics["rougeL"] > 0.8))

@asset_check(asset=model_metrics)
def rougeLsum_check(model_metrics):
    """Check ROUGE-Lsum score exceeds threshold."""
    return AssetCheckResult(passed=bool(model_metrics["rougeLsum"] > 0.8))
Multiple checks validate model quality:
  • Each check evaluates a different ROUGE metric
  • Failed checks indicate model performance issues
  • Visible in UI as warnings without blocking execution

Pipeline Definition

from dagster import Definitions

defs = Definitions(
    assets=[
        load_sql_data,
        process_dataset,
        trained_model,
        model_metrics,
    ],
    asset_checks=[
        no_empty,
        rouge1_check,
        rouge2_check,
        rougeL_check,
        rougeLsum_check,
    ],
)
Definitions object registers all assets and checks with Dagster.

Running the Pipeline

In the Dagster UI:
  1. Navigate to Assets view
  2. Select all assets (or click Materialize all)
  3. Click Materialize selected
  4. Monitor execution in Runs view
Via CLI:
dagster asset materialize -m text2sql_pipeline

Asset Metadata & Observability

Dagster supports rich metadata types:
from dagster import MetadataValue

context.add_output_metadata({
    # Numeric values
    "row_count": MetadataValue.int(1000),
    "accuracy": MetadataValue.float(0.95),
    
    # JSON for structured data
    "sample": MetadataValue.json({"text": "example", "label": 1}),
    
    # Markdown for rich text
    "description": MetadataValue.md("# Model trained successfully"),
    
    # Tables
    "metrics": MetadataValue.table(
        records=[{"epoch": 1, "loss": 0.5}, {"epoch": 2, "loss": 0.3}]
    ),
    
    # URLs and paths
    "model_url": MetadataValue.url("https://wandb.ai/..."),
    "artifact_path": MetadataValue.path("/tmp/model"),
})
All metadata is visible in the Dagster UI.
Asset Checks (recommended):
  • Non-blocking: execution continues even if checks fail
  • Visible in UI with status indicators
  • Can be run independently of materialization
  • Support rich failure messages
Assertions (alternative):
  • Blocking: raise exceptions to halt execution
  • Useful for critical validations
  • Example:
    @asset
    def my_asset(upstream):
        assert len(upstream) > 0, "Empty dataset!"
        return process(upstream)
    
Organize assets with group_name:
@asset(group_name="data")
def load_data(): ...

@asset(group_name="model")
def train_model(): ...
Groups appear in UI for better organization:
  • data: Data loading and preprocessing
  • model: Training and evaluation
  • inference: Prediction and serving

Best Practices

Asset Design

  • Define assets by what they produce, not how
  • Keep assets pure functions when possible
  • Use descriptive names (e.g., cleaned_data, not step_2)
  • Group related assets logically

Data Quality

  • Add asset checks for critical validations
  • Use checks for schema validation, null checks, ranges
  • Set meaningful thresholds (don’t over-check)
  • Document why checks exist in docstrings

Metadata

  • Attach metadata for observability
  • Log samples, counts, metrics, distributions
  • Use markdown for human-readable summaries
  • Include links to external systems (W&B, MLflow)

Resource Management

  • Use compute_kind to indicate execution environment
  • Offload heavy compute to Modal, Kubernetes, etc.
  • Partition large assets for incremental processing
  • Configure retries for flaky dependencies

Comparison with Other Orchestrators

Dagster Advantages:
  • Asset-centric: focuses on data products, not tasks
  • Built-in data quality checks
  • Rich metadata and observability
  • Better local development experience
Airflow Advantages:
  • More mature ecosystem
  • Broader community and integrations
  • Battle-tested at scale
  • More flexible scheduling options

Troubleshooting

Debug failed materializations:
  1. Click failed asset in Runs view
  2. Expand Logs to see error traceback
  3. Check Compute Logs for stdout/stderr
  4. Verify upstream assets materialized successfully
  5. Test asset function locally:
    from text2sql_pipeline import load_sql_data
    from dagster import build_asset_context
    
    result = load_sql_data(build_asset_context())
    print(result)
    
If checks consistently fail:
  • Review threshold values (e.g., ROUGE > 0.8 may be too strict)
  • Check if asset returns expected format
  • Add logging in check functions to debug
  • Consider making checks warnings instead of failures

Additional Resources

Next Steps

Practice Exercises

Complete hands-on exercises to build your own orchestration pipelines

Build docs developers (and LLMs) love