dbt Integration
The dagster-dbt integration allows you to represent dbt models, seeds, snapshots, and tests as Dagster assets. This enables you to:
- Automatically generate Dagster assets from your dbt project
- Track lineage between dbt models and other data assets
- Run dbt commands through Dagster
- Schedule dbt runs alongside other data pipelines
- Monitor dbt test results as asset checks
Installation
Install the dbt integration alongside your dbt adapter:
pip install dagster-dbt dbt-core
# Install your database adapter
pip install dbt-snowflake # or dbt-bigquery, dbt-redshift, etc.
dagster-dbt supports dbt Core versions 1.7 through 1.11. It requires Python 3.10 or higher.
Quick Start
Create Dagster assets from your dbt project:
from pathlib import Path
from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
# Point to your dbt project directory
dbt_project_dir = Path(__file__).parent / "my_dbt_project"
dbt_project = DbtProject(project_dir=dbt_project_dir)
# Prepare the dbt project (compiles manifest)
dbt_project.prepare_if_dev()
# Create a dbt resource
dbt_resource = DbtCliResource(project_dir=dbt_project)
# Define dbt assets
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Create definitions
defs = Definitions(
assets=[dbt_models],
resources={"dbt": dbt_resource},
)
Core Components
DbtCliResource
The DbtCliResource executes dbt CLI commands and streams events back to Dagster:
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
project_dir="/path/to/dbt/project",
profiles_dir="/path/to/profiles", # Optional
profile="production", # Optional
target="prod", # Optional
)
Configuration options:
project_dir: Path to dbt project containing dbt_project.yml
profiles_dir: Path to directory containing profiles.yml (defaults to project_dir)
profile: Profile name from profiles.yml to use
target: Target name within the profile to use
global_config_flags: List of global dbt CLI flags
state_path: Path to directory with dbt state for incremental operations
@dbt_assets Decorator
The @dbt_assets decorator transforms a function into a set of Dagster assets representing your dbt models:
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(
manifest="path/to/target/manifest.json",
project=dbt_project,
)
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
# Run dbt build command
yield from dbt.cli(["build"], context=context).stream()
Key parameters:
manifest: Path to dbt manifest.json file (generated by dbt parse)
project: DbtProject object for your dbt project
select: dbt selection string to subset models
exclude: dbt exclusion string to exclude models
dagster_dbt_translator: Custom translator for asset metadata
Running dbt Commands
Execute any dbt CLI command through the resource:
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
# Run specific dbt commands
yield from dbt.cli(["run"], context=context).stream()
# Run with selection
yield from dbt.cli(["run", "--select", "tag:daily"], context=context).stream()
# Run with vars
yield from dbt.cli(
["build", "--vars", '{"start_date": "2024-01-01"}'],
context=context,
).stream()
Always use .stream() to emit Dagster events as dbt executes. This provides real-time progress updates in the UI.
Asset Checks from dbt Tests
Dagster automatically converts dbt tests into asset checks:
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings
# Enable asset checks (enabled by default in recent versions)
@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
),
)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
When you run dbt build or dbt test, test results appear as asset check results in Dagster.
Customize how dbt models map to Dagster assets:
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props):
# Add a prefix to all asset keys
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return AssetKey(["raw", name])
else:
return AssetKey(["analytics", name])
def get_group_name(self, dbt_resource_props):
# Group assets by dbt schema
return dbt_resource_props.get("schema")
@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomDbtTranslator(),
)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
Scheduling dbt Runs
Schedule your dbt assets with Dagster schedules:
from dagster import ScheduleDefinition, Definitions
# Daily schedule for all dbt assets
daily_dbt_schedule = ScheduleDefinition(
name="daily_dbt_models",
target="*", # Materialize all assets
cron_schedule="0 2 * * *", # 2 AM daily
)
defs = Definitions(
assets=[dbt_models],
schedules=[daily_dbt_schedule],
resources={"dbt": dbt_resource},
)
Or create a schedule from dbt selection syntax:
from dagster_dbt import build_schedule_from_dbt_selection
# Schedule only models with tag:daily
daily_schedule = build_schedule_from_dbt_selection(
dbt_assets=[dbt_models],
job_name="daily_dbt_job",
cron_schedule="0 2 * * *",
dbt_select="tag:daily",
)
defs = Definitions(
assets=[dbt_models],
schedules=[daily_schedule],
resources={"dbt": dbt_resource},
)
Upstream and Downstream Dependencies
Connect dbt models to other Dagster assets:
from dagster import asset, AssetIn
# Upstream Python asset
@asset
def raw_customers():
# Fetch data and load to database
return {"status": "loaded"}
# dbt models depend on raw data
@dbt_assets(
manifest=dbt_project.manifest_path,
# Define upstream dependency
# dbt source "raw"."customers" -> Dagster asset raw_customers
)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Downstream asset using dbt output
@asset(deps=[AssetKey(["customers"])]) # dbt model name
def customer_analysis():
# Read dbt model output and perform analysis
return {"total_customers": 1000}
dbt Cloud Integration
For dbt Cloud users, load assets from Cloud jobs:
from dagster_dbt import (
DbtCloudClientResource,
dbt_cloud_assets,
DbtCloudWorkspace,
)
dbt_cloud = DbtCloudClientResource(api_token={"env": "DBT_CLOUD_API_TOKEN"})
@dbt_cloud_assets(
workspace=DbtCloudWorkspace(
dbt_cloud=dbt_cloud,
project_id="12345",
job_id="67890",
)
)
def dbt_cloud_models():
# Automatically triggers dbt Cloud job
pass
defs = Definitions(
assets=[dbt_cloud_models],
resources={"dbt_cloud": dbt_cloud},
)
Incremental Models and State
Use dbt state for incremental runs:
dbt_resource = DbtCliResource(
project_dir=dbt_project,
state_path="target", # Path relative to project_dir
)
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
# Only run modified models
yield from dbt.cli(
["build", "--select", "state:modified+"],
context=context,
).stream()
Subsetting dbt Models
Run only a subset of dbt models:
# Select by tag
@dbt_assets(
manifest=dbt_project.manifest_path,
select="tag:hourly",
)
def hourly_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Select by path
@dbt_assets(
manifest=dbt_project.manifest_path,
select="marts.finance",
)
def finance_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Exclude models
@dbt_assets(
manifest=dbt_project.manifest_path,
exclude="tag:deprecated",
)
def active_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
Best Practices
-
Use
prepare_if_dev(): Call this on your DbtProject to automatically compile the manifest during development
-
Stream events: Always use
.stream() on dbt CLI invocations for real-time progress
-
Enable asset checks: Convert dbt tests to Dagster asset checks for better observability
-
Version control manifest: Commit your
manifest.json to git or generate it in CI for consistent deploys
-
Use profiles.yml: Keep credentials in
profiles.yml or use environment variables, not hardcoded in code
Troubleshooting
Manifest not found
Ensure you’ve run dbt parse or called dbt_project.prepare_if_dev() before loading assets.
Import errors
Verify you have both dagster-dbt and your dbt adapter installed:
pip install dagster-dbt dbt-core dbt-snowflake
Test failures not appearing
Enable asset checks in your translator settings:
from dagster_dbt import DagsterDbtTranslatorSettings
settings = DagsterDbtTranslatorSettings(enable_asset_checks=True)
API Reference
Key classes and functions:
DbtCliResource: Resource for executing dbt CLI commands
DbtProject: Represents a dbt project with manifest compilation
@dbt_assets: Decorator to create Dagster assets from dbt models
DagsterDbtTranslator: Customize asset key and metadata mapping
build_schedule_from_dbt_selection: Create schedules for dbt subsets
DbtManifestAssetSelection: Select assets using dbt selection syntax
For complete API documentation, see the dagster-dbt API reference.
Next Steps