Skip to main content
Dagster integrates seamlessly with popular data tools to build end-to-end data pipelines. These integrations provide asset definitions, resources, and I/O managers tailored to each tool.

dbt Integration

dbt is one of the most popular integrations. Dagster can load your dbt project and represent each model as an asset.

Basic dbt Integration

from dagster_dbt import DbtCliResource, dbt_assets
import dagster as dg

@dbt_assets(manifest="/path/to/manifest.json")
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

defs = dg.Definitions(
    assets=[dbt_models],
    resources={
        "dbt": DbtCliResource(project_dir="/path/to/dbt/project")
    },
)

Upstream Dependencies

Combine dbt models with Python assets:
import pandas as pd
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets

@dg.asset
def raw_customers():
    """Extract raw customer data."""
    df = pd.read_csv("s3://bucket/raw_customers.csv")
    df.to_csv("data/raw_customers.csv", index=False)
    return df

@dbt_assets(
    manifest="/path/to/manifest.json",
    io_manager_key="db_io_manager"
)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
    """Transform data with dbt."""
    yield from dbt.cli(["build"], context=context).stream()

@dg.asset(deps=[dbt_models])
def customer_analysis(context: dg.AssetExecutionContext):
    """Analyze transformed customer data."""
    df = pd.read_sql("SELECT * FROM customers", con=get_connection())
    return df.describe()
The @dbt_assets decorator automatically creates an asset for each dbt model in your project, maintaining dependencies between models.

Airbyte Integration

Airbyte syncs data from external sources into your warehouse. Dagster can orchestrate these syncs:
from dagster_airbyte import build_airbyte_assets, AirbyteResource
import dagster as dg

# Build assets from Airbyte connection
airbyte_assets = build_airbyte_assets(
    connection_id="your-connection-id",
    destination_tables=["orders", "users"],
    asset_key_prefix=["postgres_replica"],
)

defs = dg.Definitions(
    assets=[airbyte_assets],
    resources={
        "airbyte": AirbyteResource(
            host="localhost",
            port="8000",
        )
    },
)

Snowflake Integration

Connect to Snowflake for reading and writing data:
from dagster_snowflake import SnowflakeResource
import dagster as dg
import pandas as pd

class SnowflakeConfig(dg.ConfigurableResource):
    account: str
    user: str
    password: str
    database: str
    warehouse: str
    schema_: str

@dg.asset
def snowflake_table(snowflake: SnowflakeResource) -> pd.DataFrame:
    with snowflake.get_connection() as conn:
        df = pd.read_sql(
            "SELECT * FROM my_table LIMIT 1000",
            conn
        )
    return df

defs = dg.Definitions(
    assets=[snowflake_table],
    resources={
        "snowflake": SnowflakeResource(
            account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
            user=dg.EnvVar("SNOWFLAKE_USER"),
            password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
            database="ANALYTICS",
            warehouse="COMPUTE_WH",
            schema="PUBLIC",
        )
    },
)

Snowflake I/O Manager

Use the I/O manager to automatically store assets in Snowflake:
from dagster_snowflake_pandas import SnowflakePandasIOManager
import dagster as dg
import pandas as pd

@dg.asset(io_manager_key="snowflake_io")
def customers() -> pd.DataFrame:
    return pd.DataFrame({
        "customer_id": [1, 2, 3],
        "name": ["Alice", "Bob", "Charlie"]
    })

@dg.asset(io_manager_key="snowflake_io")
def orders(customers: pd.DataFrame) -> pd.DataFrame:
    # Data is automatically loaded from Snowflake
    return pd.DataFrame({
        "order_id": [101, 102],
        "customer_id": [1, 2]
    })

defs = dg.Definitions(
    assets=[customers, orders],
    resources={
        "snowflake_io": SnowflakePandasIOManager(
            account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
            user=dg.EnvVar("SNOWFLAKE_USER"),
            password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
            database="ANALYTICS",
            warehouse="COMPUTE_WH",
            schema="PUBLIC",
        )
    },
)

OpenAI Integration

Build AI-powered pipelines with the OpenAI integration:
from dagster_openai import OpenAIResource
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
import dagster as dg

@dg.asset(compute_kind="OpenAI")
def search_index(
    context: dg.AssetExecutionContext,
    openai: OpenAIResource,
    source_docs: list
) -> bytes:
    """Create a search index from documents."""
    with openai.get_client(context) as client:
        search_index = FAISS.from_documents(
            source_docs,
            OpenAIEmbeddings(client=client.embeddings)
        )
    return search_index.serialize_to_bytes()

class OpenAIConfig(dg.Config):
    model: str
    question: str

@dg.asset(compute_kind="OpenAI")
def completion(
    context: dg.AssetExecutionContext,
    openai: OpenAIResource,
    config: OpenAIConfig,
    search_index: bytes
):
    """Generate completion using the search index."""
    index = FAISS.deserialize_from_bytes(
        search_index,
        OpenAIEmbeddings(),
        allow_dangerous_deserialization=True
    )
    
    with openai.get_client(context) as client:
        model = ChatOpenAI(
            client=client.chat.completions,
            model=config.model,
            temperature=0
        )
        docs = index.similarity_search(config.question, k=4)
        # Generate completion with retrieved docs
        response = model.invoke(format_prompt(docs, config.question))
        context.log.info(response)

Databricks Integration

Execute Spark jobs on Databricks:
from dagster_databricks import DatabricksClientResource
import dagster as dg

@dg.asset
def databricks_job(
    databricks: DatabricksClientResource,
    context: dg.AssetExecutionContext
):
    """Run a Databricks job."""
    run_id = databricks.run_now(
        job_id="12345",
        notebook_params={"date": context.partition_key}
    )
    
    context.log.info(f"Started Databricks run {run_id}")
    databricks.wait_for_run_completion(run_id)
    
    return run_id

defs = dg.Definitions(
    assets=[databricks_job],
    resources={
        "databricks": DatabricksClientResource(
            host=dg.EnvVar("DATABRICKS_HOST"),
            token=dg.EnvVar("DATABRICKS_TOKEN"),
        )
    },
)

BigQuery Integration

Work with Google BigQuery:
from dagster_gcp import BigQueryResource
import dagster as dg
import pandas as pd

@dg.asset
def bigquery_data(bigquery: BigQueryResource) -> pd.DataFrame:
    query = """
        SELECT
            date,
            SUM(revenue) as total_revenue
        FROM `project.dataset.sales`
        WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
        GROUP BY date
        ORDER BY date
    """
    
    with bigquery.get_client() as client:
        df = client.query(query).to_dataframe()
    
    return df

defs = dg.Definitions(
    assets=[bigquery_data],
    resources={
        "bigquery": BigQueryResource(
            project=dg.EnvVar("GCP_PROJECT"),
        )
    },
)

Fivetran Integration

Orchestrate Fivetran syncs:
from dagster_fivetran import FivetranResource, build_fivetran_assets
import dagster as dg

fivetran_assets = build_fivetran_assets(
    connector_id="your-connector-id",
    destination_tables=["schema.table1", "schema.table2"],
)

defs = dg.Definitions(
    assets=[fivetran_assets],
    resources={
        "fivetran": FivetranResource(
            api_key=dg.EnvVar("FIVETRAN_API_KEY"),
            api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
        )
    },
)

DuckDB Integration

Use DuckDB for fast analytics:
from dagster_duckdb import DuckDBResource
import dagster as dg
import pandas as pd

@dg.asset
def duckdb_analysis(duckdb: DuckDBResource) -> pd.DataFrame:
    query = """
        SELECT
            category,
            COUNT(*) as count,
            AVG(price) as avg_price
        FROM products
        GROUP BY category
    """
    
    with duckdb.get_connection() as conn:
        df = conn.execute(query).df()
    
    return df

defs = dg.Definitions(
    assets=[duckdb_analysis],
    resources={
        "duckdb": DuckDBResource(
            database="analytics.duckdb"
        )
    },
)

Slack Integration

Send notifications to Slack:
from dagster_slack import SlackResource
import dagster as dg

@dg.asset
def important_metric() -> float:
    return calculate_kpi()

@dg.sensor(job=my_job)
def metric_alert_sensor(
    context: dg.SensorEvaluationContext,
    slack: SlackResource
):
    metric_value = fetch_latest_metric()
    
    if metric_value < THRESHOLD:
        slack.get_client().chat_postMessage(
            channel="#alerts",
            text=f"⚠️ Metric dropped to {metric_value}"
        )
        yield dg.RunRequest(run_key=str(context.cursor))
    else:
        yield dg.SkipReason(f"Metric is healthy: {metric_value}")

defs = dg.Definitions(
    assets=[important_metric],
    sensors=[metric_alert_sensor],
    resources={
        "slack": SlackResource(
            token=dg.EnvVar("SLACK_TOKEN")
        )
    },
)

PySpark Integration

Run Spark transformations:
from dagster_pyspark import PySparkResource
import dagster as dg
from pyspark.sql import DataFrame

@dg.asset
def spark_transform(pyspark: PySparkResource) -> DataFrame:
    spark = pyspark.spark_session
    
    df = spark.read.parquet("s3://bucket/input/")
    
    transformed = df.filter(df.value > 100) \
                    .groupBy("category") \
                    .agg({"value": "sum"})
    
    transformed.write.parquet("s3://bucket/output/")
    
    return transformed

defs = dg.Definitions(
    assets=[spark_transform],
    resources={
        "pyspark": PySparkResource(
            spark_conf={
                "spark.executor.memory": "4g",
                "spark.executor.cores": "2",
            }
        )
    },
)

Sling Integration

Use Sling for data replication:
from dagster_sling import SlingResource, sling_assets
import dagster as dg

replication_config = """
source: MY_POSTGRES
target: MY_SNOWFLAKE

streams:
  public.users:
    sql: SELECT * FROM public.users WHERE updated_at > '{last_value}'
    primary_key: [id]
    update_key: updated_at
    
  public.orders:
    sql: SELECT * FROM public.orders
    primary_key: [order_id]
"""

@sling_assets(replication_config=replication_config)
def my_sling_assets(context: dg.AssetExecutionContext, sling: SlingResource):
    yield from sling.replicate(context=context).stream()

defs = dg.Definitions(
    assets=[my_sling_assets],
    resources={
        "sling": SlingResource(
            connections=[
                {"name": "MY_POSTGRES", "type": "postgres", ...},
                {"name": "MY_SNOWFLAKE", "type": "snowflake", ...},
            ]
        )
    },
)
1
Install the integration library
2
Run pip install dagster-{integration} for your tool.
3
Configure the resource
4
Set up credentials and connection details.
5
Define assets
6
Use integration-specific decorators or builders.
7
Add to Definitions
8
Include assets and resources in your Definitions.

Modern Data Stack Example

Combine multiple integrations into a complete pipeline:
from dagster_airbyte import build_airbyte_assets
from dagster_dbt import DbtCliResource, dbt_assets
from dagster import AssetExecutionContext, asset
import numpy as np
import pandas as pd
from scipy import optimize

# Extract: Airbyte syncs data from source to warehouse
airbyte_assets = build_airbyte_assets(
    connection_id="abc123",
    destination_tables=["orders", "users"],
    asset_key_prefix=["postgres_replica"],
)

# Transform: dbt models transform raw data
@dbt_assets(
    manifest="/path/to/manifest.json",
    io_manager_key="db_io_manager",
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Model: Python asset for ML forecasting
def model_func(x, a, b):
    return a * np.exp(b * (x / 10**18 - 1.6095))

@asset(compute_kind="python")
def order_forecast_model(daily_order_summary: pd.DataFrame) -> np.ndarray:
    """Model parameters that best fit the observed data."""
    train_set = daily_order_summary.to_numpy()
    return optimize.curve_fit(
        f=model_func,
        xdata=train_set[:, 0],
        ydata=train_set[:, 2],
        p0=[10, 100]
    )[0]

@asset(compute_kind="python", io_manager_key="db_io_manager")
def predicted_orders(
    daily_order_summary: pd.DataFrame,
    order_forecast_model: np.ndarray
) -> pd.DataFrame:
    """Predicted orders for the next 30 days."""
    a, b = tuple(order_forecast_model)
    start_date = daily_order_summary.order_date.max()
    future_dates = pd.date_range(
        start=start_date,
        end=start_date + pd.DateOffset(days=30)
    )
    predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b)
    return pd.DataFrame({
        "order_date": future_dates,
        "num_orders": predicted_data
    })
Ensure you have network connectivity and proper credentials when using external integrations. Use environment variables for sensitive configuration.

Best Practices

Use I/O Managers

Let I/O managers handle data storage and retrieval:
@asset(io_manager_key="snowflake_io")
def my_table() -> pd.DataFrame:
    # Automatically stored in Snowflake
    return pd.DataFrame({...})

Partition for Performance

Partition large datasets for incremental processing:
@dbt_assets(
    manifest="/path/to/manifest.json",
    partitions_def=daily_partitions
)
def dbt_models(context, dbt):
    dbt_vars = {"partition_date": context.partition_key}
    yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)]).stream()

Handle Credentials Securely

Use environment variables:
resources={
    "snowflake": SnowflakeResource(
        account=dg.EnvVar("SNOWFLAKE_ACCOUNT"),
        password=dg.EnvVar("SNOWFLAKE_PASSWORD"),
    )
}

Next Steps

Build docs developers (and LLMs) love