Skip to main content

Snowflake Integration

The dagster-snowflake library enables you to:
  • Execute queries against Snowflake
  • Build assets backed by Snowflake tables and views
  • Use Snowflake as a data warehouse in your pipeline
  • Manage Snowflake connections as Dagster resources
  • Track data lineage across Snowflake objects

Installation

pip install dagster-snowflake

# With Pandas support
pip install dagster-snowflake-pandas

# With PySpark support
pip install dagster-snowflake-pyspark

Quick Start

Connect to Snowflake and execute queries:
from dagster import asset, Definitions
from dagster_snowflake import SnowflakeResource

@asset
def snowflake_query(snowflake: SnowflakeResource):
    with snowflake.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE OR REPLACE TABLE analytics.customer_summary AS
            SELECT 
                customer_id,
                COUNT(*) as order_count,
                SUM(amount) as total_spent
            FROM raw.orders
            GROUP BY customer_id
        """)
        
        result = cursor.fetchone()
        return {"rows_inserted": result[0] if result else 0}

defs = Definitions(
    assets=[snowflake_query],
    resources={
        "snowflake": SnowflakeResource(
            account="my-account",
            user="my-user",
            password={"env": "SNOWFLAKE_PASSWORD"},
            database="analytics",
            schema="public",
            warehouse="compute_wh",
        )
    },
)

Snowflake Resource

The SnowflakeResource manages connections to Snowflake:
from dagster_snowflake import SnowflakeResource

snowflake = SnowflakeResource(
    account="my-account",       # Snowflake account identifier
    user="my-user",
    password={"env": "SNOWFLAKE_PASSWORD"},
    database="analytics",       # Default database
    schema="public",            # Default schema
    warehouse="compute_wh",     # Default warehouse
    role="analyst_role",        # Optional: default role
)
Configuration options:
  • account: Snowflake account identifier (e.g., xy12345.us-east-1)
  • user: Username for authentication
  • password: Password (use EnvVar for security)
  • database: Default database name
  • schema: Default schema name
  • warehouse: Virtual warehouse for query execution
  • role: Role to assume after connection
  • authenticator: Authentication method (default: snowflake, or externalbrowser)
  • private_key: Private key for key-pair authentication
  • private_key_password: Password for encrypted private key

Key-Pair Authentication

For enhanced security, use key-pair authentication:
snowflake = SnowflakeResource(
    account="my-account",
    user="my-user",
    private_key={"env": "SNOWFLAKE_PRIVATE_KEY"},  # Base64 encoded
    private_key_password={"env": "PRIVATE_KEY_PASSWORD"},
    database="analytics",
    warehouse="compute_wh",
)

SSO Authentication

snowflake = SnowflakeResource(
    account="my-account",
    user="my-user",
    authenticator="externalbrowser",  # Opens browser for SSO
    database="analytics",
    warehouse="compute_wh",
)

Executing Queries

Basic Query Execution

from dagster import asset, OpExecutionContext
from dagster_snowflake import SnowflakeResource

@asset
def create_table(context: OpExecutionContext, snowflake: SnowflakeResource):
    query = """
        CREATE OR REPLACE TABLE analytics.daily_metrics AS
        SELECT 
            DATE(timestamp) as date,
            COUNT(DISTINCT user_id) as active_users,
            COUNT(*) as total_events
        FROM raw.events
        WHERE DATE(timestamp) = CURRENT_DATE - 1
        GROUP BY DATE(timestamp)
    """
    
    with snowflake.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute(query)
        
        rows_affected = cursor.rowcount
        context.log.info(f"Inserted {rows_affected} rows")
        
        return {"rows_inserted": rows_affected}

Query with Parameters

@asset
def parameterized_query(snowflake: SnowflakeResource):
    with snowflake.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute(
            "SELECT * FROM customers WHERE country = %s AND status = %s",
            ("US", "active"),
        )
        results = cursor.fetchall()
        
        return {"row_count": len(results)}

Using snowflake_op_for_query

Create reusable ops for common queries:
from dagster import Definitions
from dagster_snowflake import snowflake_op_for_query, SnowflakeResource

# Create an op from a SQL query
update_metrics = snowflake_op_for_query(
    sql="""
        UPDATE analytics.metrics
        SET processed = TRUE
        WHERE date = CURRENT_DATE
    """,
    name="update_metrics",
)

defs = Definitions(
    assets=[update_metrics],
    resources={
        "snowflake": SnowflakeResource(
            account="my-account",
            user="my-user",
            password={"env": "SNOWFLAKE_PASSWORD"},
            database="analytics",
            warehouse="compute_wh",
        )
    },
)

Snowflake IO Manager

Store and retrieve Pandas DataFrames in Snowflake tables:
from dagster import asset, Definitions
from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagster_snowflake import SnowflakeResource
import pandas as pd

@asset
def raw_data() -> pd.DataFrame:
    # Generate or fetch data
    return pd.DataFrame({
        "customer_id": [1, 2, 3],
        "revenue": [100, 200, 150],
    })

@asset
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
    # Transform data
    return raw_data[raw_data["revenue"] > 100]

defs = Definitions(
    assets=[raw_data, processed_data],
    resources={
        "io_manager": SnowflakePandasIOManager(
            account="my-account",
            user="my-user",
            password={"env": "SNOWFLAKE_PASSWORD"},
            database="analytics",
            schema="dagster_io",
            warehouse="compute_wh",
        )
    },
)
The Snowflake IO Manager automatically creates tables for asset outputs and loads them as inputs to downstream assets.

Custom Table Names

Control table naming:
from dagster import asset, Output

@asset(
    metadata={
        "schema": "analytics",
        "table": "customer_data",
    }
)
def customers() -> pd.DataFrame:
    return pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})

Integration with dbt

Combine Dagster, dbt, and Snowflake:
from dagster import asset, AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
from dagster_snowflake import SnowflakeResource
import pandas as pd

# Load raw data to Snowflake
@asset
def raw_customers(snowflake: SnowflakeResource) -> None:
    df = pd.read_csv("customers.csv")
    
    with snowflake.get_connection() as conn:
        # Use Snowflake Python connector to load data
        df.to_sql(
            "customers",
            con=conn,
            schema="raw",
            if_exists="replace",
            index=False,
        )

# dbt models run transformations in Snowflake
dbt_project = DbtProject(project_dir="path/to/dbt")
dbt_project.prepare_if_dev()

@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

# Downstream Python asset
@asset
def customer_analysis(snowflake: SnowflakeResource):
    with snowflake.get_connection() as conn:
        # Read dbt-generated table
        df = pd.read_sql(
            "SELECT * FROM analytics.dim_customers",
            con=conn,
        )
        
        return {"customer_count": len(df)}

defs = Definitions(
    assets=[raw_customers, dbt_models, customer_analysis],
    resources={
        "snowflake": SnowflakeResource(
            account="my-account",
            user="my-user",
            password={"env": "SNOWFLAKE_PASSWORD"},
            database="analytics",
            warehouse="transform_wh",
        ),
        "dbt": DbtCliResource(project_dir=dbt_project),
    },
)

Table Metadata

Track Snowflake table metadata:
from dagster import asset, OpExecutionContext, MetadataValue
from dagster_snowflake import SnowflakeResource

@asset
def table_with_metadata(
    context: OpExecutionContext,
    snowflake: SnowflakeResource,
):
    with snowflake.get_connection() as conn:
        cursor = conn.cursor()
        
        # Create table
        cursor.execute("""
            CREATE OR REPLACE TABLE analytics.summary AS
            SELECT * FROM raw.data
        """)
        
        # Get table statistics
        cursor.execute("""
            SELECT 
                COUNT(*) as row_count,
                COUNT(DISTINCT customer_id) as unique_customers
            FROM analytics.summary
        """)
        stats = cursor.fetchone()
        
        # Report metadata
        context.add_output_metadata(
            {
                "row_count": stats[0],
                "unique_customers": stats[1],
                "table_name": "analytics.summary",
                "snowflake_query_id": cursor.sfqid,
            }
        )
        
        return {"status": "created"}

Fetch Last Updated Timestamps

Track when Snowflake tables were last modified:
from dagster import sensor, SensorEvaluationContext, RunRequest
from dagster_snowflake import (
    SnowflakeResource,
    fetch_last_updated_timestamps,
)

@sensor(job_name="process_updated_tables")
def snowflake_table_sensor(
    context: SensorEvaluationContext,
    snowflake: SnowflakeResource,
):
    # Fetch last updated times for tables
    timestamps = fetch_last_updated_timestamps(
        snowflake=snowflake,
        schema="raw",
        tables=["orders", "customers"],
    )
    
    # Trigger runs for recently updated tables
    for table, timestamp in timestamps.items():
        cursor = context.cursor or "2024-01-01"
        if timestamp > cursor:
            yield RunRequest(
                run_key=f"{table}_{timestamp}",
                run_config={"ops": {"config": {"table": table}}},
            )
            context.update_cursor(str(timestamp))

Best Practices

  1. Use warehouses efficiently: Assign appropriate warehouse sizes for workload
  2. Connection pooling: Reuse connections with get_connection() context manager
  3. Secure credentials: Always use environment variables for passwords
  4. Schema organization: Use separate schemas for raw, staging, and production data
  5. Query optimization: Use Snowflake query profiling to optimize expensive queries
  6. Cost monitoring: Track warehouse usage and query costs

Performance Tips

Use Query Tags

with snowflake.get_connection() as conn:
    cursor = conn.cursor()
    cursor.execute("ALTER SESSION SET QUERY_TAG = 'dagster_pipeline'")
    cursor.execute("SELECT * FROM large_table")

Enable Result Caching

with snowflake.get_connection() as conn:
    cursor = conn.cursor()
    cursor.execute("ALTER SESSION SET USE_CACHED_RESULT = TRUE")

Optimize Large Loads

@asset
def bulk_load(snowflake: SnowflakeResource):
    # Use COPY INTO for large data loads
    with snowflake.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("""
            COPY INTO analytics.large_table
            FROM @my_stage/data.csv
            FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',')
        """)

Troubleshooting

Connection timeout

Increase timeout in connection parameters:
snowflake = SnowflakeResource(
    account="my-account",
    user="my-user",
    password={"env": "SNOWFLAKE_PASSWORD"},
    database="analytics",
    warehouse="compute_wh",
    connect_timeout=30,  # seconds
)

Warehouse suspended

Ensure warehouse is running:
ALTER WAREHOUSE compute_wh RESUME IF SUSPENDED;

Authentication errors

Verify account identifier format:
  • Include region: xy12345.us-east-1
  • For AWS: account.region
  • For Azure: account.region.azure
  • For GCP: account.region.gcp

API Reference

Key components:
  • SnowflakeResource: Connection resource for Snowflake
  • snowflake_op_for_query: Create ops from SQL queries
  • SnowflakePandasIOManager: Store DataFrames as Snowflake tables
  • SnowflakePySparkIOManager: Store Spark DataFrames in Snowflake
  • fetch_last_updated_timestamps: Query table modification times
  • build_snowflake_io_manager: Build custom IO managers
For complete documentation, see dagster-snowflake API reference.

Next Steps

Build docs developers (and LLMs) love