Skip to main content

dbt Integration

The dagster-dbt integration allows you to represent dbt models, seeds, snapshots, and tests as Dagster assets. This enables you to:
  • Automatically generate Dagster assets from your dbt project
  • Track lineage between dbt models and other data assets
  • Run dbt commands through Dagster
  • Schedule dbt runs alongside other data pipelines
  • Monitor dbt test results as asset checks

Installation

Install the dbt integration alongside your dbt adapter:
pip install dagster-dbt dbt-core

# Install your database adapter
pip install dbt-snowflake  # or dbt-bigquery, dbt-redshift, etc.
dagster-dbt supports dbt Core versions 1.7 through 1.11. It requires Python 3.10 or higher.

Quick Start

Create Dagster assets from your dbt project:
from pathlib import Path
from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets

# Point to your dbt project directory
dbt_project_dir = Path(__file__).parent / "my_dbt_project"
dbt_project = DbtProject(project_dir=dbt_project_dir)

# Prepare the dbt project (compiles manifest)
dbt_project.prepare_if_dev()

# Create a dbt resource
dbt_resource = DbtCliResource(project_dir=dbt_project)

# Define dbt assets
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Create definitions
defs = Definitions(
    assets=[dbt_models],
    resources={"dbt": dbt_resource},
)

Core Components

DbtCliResource

The DbtCliResource executes dbt CLI commands and streams events back to Dagster:
from dagster_dbt import DbtCliResource

dbt = DbtCliResource(
    project_dir="/path/to/dbt/project",
    profiles_dir="/path/to/profiles",  # Optional
    profile="production",              # Optional
    target="prod",                      # Optional
)
Configuration options:
  • project_dir: Path to dbt project containing dbt_project.yml
  • profiles_dir: Path to directory containing profiles.yml (defaults to project_dir)
  • profile: Profile name from profiles.yml to use
  • target: Target name within the profile to use
  • global_config_flags: List of global dbt CLI flags
  • state_path: Path to directory with dbt state for incremental operations

@dbt_assets Decorator

The @dbt_assets decorator transforms a function into a set of Dagster assets representing your dbt models:
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets

@dbt_assets(
    manifest="path/to/target/manifest.json",
    project=dbt_project,
)
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    # Run dbt build command
    yield from dbt.cli(["build"], context=context).stream()
Key parameters:
  • manifest: Path to dbt manifest.json file (generated by dbt parse)
  • project: DbtProject object for your dbt project
  • select: dbt selection string to subset models
  • exclude: dbt exclusion string to exclude models
  • dagster_dbt_translator: Custom translator for asset metadata

Running dbt Commands

Execute any dbt CLI command through the resource:
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    # Run specific dbt commands
    yield from dbt.cli(["run"], context=context).stream()
    
    # Run with selection
    yield from dbt.cli(["run", "--select", "tag:daily"], context=context).stream()
    
    # Run with vars
    yield from dbt.cli(
        ["build", "--vars", '{"start_date": "2024-01-01"}'],
        context=context,
    ).stream()
Always use .stream() to emit Dagster events as dbt executes. This provides real-time progress updates in the UI.

Asset Checks from dbt Tests

Dagster automatically converts dbt tests into asset checks:
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings

# Enable asset checks (enabled by default in recent versions)
@dbt_assets(
    manifest=dbt_project.manifest_path,
    dagster_dbt_translator=DagsterDbtTranslator(
        settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
    ),
)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()
When you run dbt build or dbt test, test results appear as asset check results in Dagster.

Custom Asset Keys and Metadata

Customize how dbt models map to Dagster assets:
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator

class CustomDbtTranslator(DagsterDbtTranslator):
    def get_asset_key(self, dbt_resource_props):
        # Add a prefix to all asset keys
        resource_type = dbt_resource_props["resource_type"]
        name = dbt_resource_props["name"]
        
        if resource_type == "source":
            return AssetKey(["raw", name])
        else:
            return AssetKey(["analytics", name])
    
    def get_group_name(self, dbt_resource_props):
        # Group assets by dbt schema
        return dbt_resource_props.get("schema")

@dbt_assets(
    manifest=dbt_project.manifest_path,
    dagster_dbt_translator=CustomDbtTranslator(),
)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Scheduling dbt Runs

Schedule your dbt assets with Dagster schedules:
from dagster import ScheduleDefinition, Definitions

# Daily schedule for all dbt assets
daily_dbt_schedule = ScheduleDefinition(
    name="daily_dbt_models",
    target="*",  # Materialize all assets
    cron_schedule="0 2 * * *",  # 2 AM daily
)

defs = Definitions(
    assets=[dbt_models],
    schedules=[daily_dbt_schedule],
    resources={"dbt": dbt_resource},
)
Or create a schedule from dbt selection syntax:
from dagster_dbt import build_schedule_from_dbt_selection

# Schedule only models with tag:daily
daily_schedule = build_schedule_from_dbt_selection(
    dbt_assets=[dbt_models],
    job_name="daily_dbt_job",
    cron_schedule="0 2 * * *",
    dbt_select="tag:daily",
)

defs = Definitions(
    assets=[dbt_models],
    schedules=[daily_schedule],
    resources={"dbt": dbt_resource},
)

Upstream and Downstream Dependencies

Connect dbt models to other Dagster assets:
from dagster import asset, AssetIn

# Upstream Python asset
@asset
def raw_customers():
    # Fetch data and load to database
    return {"status": "loaded"}

# dbt models depend on raw data
@dbt_assets(
    manifest=dbt_project.manifest_path,
    # Define upstream dependency
    # dbt source "raw"."customers" -> Dagster asset raw_customers
)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Downstream asset using dbt output
@asset(deps=[AssetKey(["customers"])])  # dbt model name
def customer_analysis():
    # Read dbt model output and perform analysis
    return {"total_customers": 1000}

dbt Cloud Integration

For dbt Cloud users, load assets from Cloud jobs:
from dagster_dbt import (
    DbtCloudClientResource,
    dbt_cloud_assets,
    DbtCloudWorkspace,
)

dbt_cloud = DbtCloudClientResource(api_token={"env": "DBT_CLOUD_API_TOKEN"})

@dbt_cloud_assets(
    workspace=DbtCloudWorkspace(
        dbt_cloud=dbt_cloud,
        project_id="12345",
        job_id="67890",
    )
)
def dbt_cloud_models():
    # Automatically triggers dbt Cloud job
    pass

defs = Definitions(
    assets=[dbt_cloud_models],
    resources={"dbt_cloud": dbt_cloud},
)

Incremental Models and State

Use dbt state for incremental runs:
dbt_resource = DbtCliResource(
    project_dir=dbt_project,
    state_path="target",  # Path relative to project_dir
)

@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    # Only run modified models
    yield from dbt.cli(
        ["build", "--select", "state:modified+"],
        context=context,
    ).stream()

Subsetting dbt Models

Run only a subset of dbt models:
# Select by tag
@dbt_assets(
    manifest=dbt_project.manifest_path,
    select="tag:hourly",
)
def hourly_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Select by path
@dbt_assets(
    manifest=dbt_project.manifest_path,
    select="marts.finance",
)
def finance_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Exclude models
@dbt_assets(
    manifest=dbt_project.manifest_path,
    exclude="tag:deprecated",
)
def active_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Best Practices

  1. Use prepare_if_dev(): Call this on your DbtProject to automatically compile the manifest during development
  2. Stream events: Always use .stream() on dbt CLI invocations for real-time progress
  3. Enable asset checks: Convert dbt tests to Dagster asset checks for better observability
  4. Version control manifest: Commit your manifest.json to git or generate it in CI for consistent deploys
  5. Use profiles.yml: Keep credentials in profiles.yml or use environment variables, not hardcoded in code

Troubleshooting

Manifest not found

Ensure you’ve run dbt parse or called dbt_project.prepare_if_dev() before loading assets.

Import errors

Verify you have both dagster-dbt and your dbt adapter installed:
pip install dagster-dbt dbt-core dbt-snowflake

Test failures not appearing

Enable asset checks in your translator settings:
from dagster_dbt import DagsterDbtTranslatorSettings

settings = DagsterDbtTranslatorSettings(enable_asset_checks=True)

API Reference

Key classes and functions:
  • DbtCliResource: Resource for executing dbt CLI commands
  • DbtProject: Represents a dbt project with manifest compilation
  • @dbt_assets: Decorator to create Dagster assets from dbt models
  • DagsterDbtTranslator: Customize asset key and metadata mapping
  • build_schedule_from_dbt_selection: Create schedules for dbt subsets
  • DbtManifestAssetSelection: Select assets using dbt selection syntax
For complete API documentation, see the dagster-dbt API reference.

Next Steps

Build docs developers (and LLMs) love