Snowflake Integration
The dagster-snowflake library enables you to:
- Execute queries against Snowflake
- Build assets backed by Snowflake tables and views
- Use Snowflake as a data warehouse in your pipeline
- Manage Snowflake connections as Dagster resources
- Track data lineage across Snowflake objects
Installation
pip install dagster-snowflake
# With Pandas support
pip install dagster-snowflake-pandas
# With PySpark support
pip install dagster-snowflake-pyspark
Quick Start
Connect to Snowflake and execute queries:
from dagster import asset, Definitions
from dagster_snowflake import SnowflakeResource
@asset
def snowflake_query(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE OR REPLACE TABLE analytics.customer_summary AS
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM raw.orders
GROUP BY customer_id
""")
result = cursor.fetchone()
return {"rows_inserted": result[0] if result else 0}
defs = Definitions(
assets=[snowflake_query],
resources={
"snowflake": SnowflakeResource(
account="my-account",
user="my-user",
password={"env": "SNOWFLAKE_PASSWORD"},
database="analytics",
schema="public",
warehouse="compute_wh",
)
},
)
Snowflake Resource
The SnowflakeResource manages connections to Snowflake:
from dagster_snowflake import SnowflakeResource
snowflake = SnowflakeResource(
account="my-account", # Snowflake account identifier
user="my-user",
password={"env": "SNOWFLAKE_PASSWORD"},
database="analytics", # Default database
schema="public", # Default schema
warehouse="compute_wh", # Default warehouse
role="analyst_role", # Optional: default role
)
Configuration options:
account: Snowflake account identifier (e.g., xy12345.us-east-1)
user: Username for authentication
password: Password (use EnvVar for security)
database: Default database name
schema: Default schema name
warehouse: Virtual warehouse for query execution
role: Role to assume after connection
authenticator: Authentication method (default: snowflake, or externalbrowser)
private_key: Private key for key-pair authentication
private_key_password: Password for encrypted private key
Key-Pair Authentication
For enhanced security, use key-pair authentication:
snowflake = SnowflakeResource(
account="my-account",
user="my-user",
private_key={"env": "SNOWFLAKE_PRIVATE_KEY"}, # Base64 encoded
private_key_password={"env": "PRIVATE_KEY_PASSWORD"},
database="analytics",
warehouse="compute_wh",
)
SSO Authentication
snowflake = SnowflakeResource(
account="my-account",
user="my-user",
authenticator="externalbrowser", # Opens browser for SSO
database="analytics",
warehouse="compute_wh",
)
Executing Queries
Basic Query Execution
from dagster import asset, OpExecutionContext
from dagster_snowflake import SnowflakeResource
@asset
def create_table(context: OpExecutionContext, snowflake: SnowflakeResource):
query = """
CREATE OR REPLACE TABLE analytics.daily_metrics AS
SELECT
DATE(timestamp) as date,
COUNT(DISTINCT user_id) as active_users,
COUNT(*) as total_events
FROM raw.events
WHERE DATE(timestamp) = CURRENT_DATE - 1
GROUP BY DATE(timestamp)
"""
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query)
rows_affected = cursor.rowcount
context.log.info(f"Inserted {rows_affected} rows")
return {"rows_inserted": rows_affected}
Query with Parameters
@asset
def parameterized_query(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM customers WHERE country = %s AND status = %s",
("US", "active"),
)
results = cursor.fetchall()
return {"row_count": len(results)}
Using snowflake_op_for_query
Create reusable ops for common queries:
from dagster import Definitions
from dagster_snowflake import snowflake_op_for_query, SnowflakeResource
# Create an op from a SQL query
update_metrics = snowflake_op_for_query(
sql="""
UPDATE analytics.metrics
SET processed = TRUE
WHERE date = CURRENT_DATE
""",
name="update_metrics",
)
defs = Definitions(
assets=[update_metrics],
resources={
"snowflake": SnowflakeResource(
account="my-account",
user="my-user",
password={"env": "SNOWFLAKE_PASSWORD"},
database="analytics",
warehouse="compute_wh",
)
},
)
Snowflake IO Manager
Store and retrieve Pandas DataFrames in Snowflake tables:
from dagster import asset, Definitions
from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagster_snowflake import SnowflakeResource
import pandas as pd
@asset
def raw_data() -> pd.DataFrame:
# Generate or fetch data
return pd.DataFrame({
"customer_id": [1, 2, 3],
"revenue": [100, 200, 150],
})
@asset
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
# Transform data
return raw_data[raw_data["revenue"] > 100]
defs = Definitions(
assets=[raw_data, processed_data],
resources={
"io_manager": SnowflakePandasIOManager(
account="my-account",
user="my-user",
password={"env": "SNOWFLAKE_PASSWORD"},
database="analytics",
schema="dagster_io",
warehouse="compute_wh",
)
},
)
The Snowflake IO Manager automatically creates tables for asset outputs and loads them as inputs to downstream assets.
Custom Table Names
Control table naming:
from dagster import asset, Output
@asset(
metadata={
"schema": "analytics",
"table": "customer_data",
}
)
def customers() -> pd.DataFrame:
return pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
Integration with dbt
Combine Dagster, dbt, and Snowflake:
from dagster import asset, AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
from dagster_snowflake import SnowflakeResource
import pandas as pd
# Load raw data to Snowflake
@asset
def raw_customers(snowflake: SnowflakeResource) -> None:
df = pd.read_csv("customers.csv")
with snowflake.get_connection() as conn:
# Use Snowflake Python connector to load data
df.to_sql(
"customers",
con=conn,
schema="raw",
if_exists="replace",
index=False,
)
# dbt models run transformations in Snowflake
dbt_project = DbtProject(project_dir="path/to/dbt")
dbt_project.prepare_if_dev()
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Downstream Python asset
@asset
def customer_analysis(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
# Read dbt-generated table
df = pd.read_sql(
"SELECT * FROM analytics.dim_customers",
con=conn,
)
return {"customer_count": len(df)}
defs = Definitions(
assets=[raw_customers, dbt_models, customer_analysis],
resources={
"snowflake": SnowflakeResource(
account="my-account",
user="my-user",
password={"env": "SNOWFLAKE_PASSWORD"},
database="analytics",
warehouse="transform_wh",
),
"dbt": DbtCliResource(project_dir=dbt_project),
},
)
Track Snowflake table metadata:
from dagster import asset, OpExecutionContext, MetadataValue
from dagster_snowflake import SnowflakeResource
@asset
def table_with_metadata(
context: OpExecutionContext,
snowflake: SnowflakeResource,
):
with snowflake.get_connection() as conn:
cursor = conn.cursor()
# Create table
cursor.execute("""
CREATE OR REPLACE TABLE analytics.summary AS
SELECT * FROM raw.data
""")
# Get table statistics
cursor.execute("""
SELECT
COUNT(*) as row_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM analytics.summary
""")
stats = cursor.fetchone()
# Report metadata
context.add_output_metadata(
{
"row_count": stats[0],
"unique_customers": stats[1],
"table_name": "analytics.summary",
"snowflake_query_id": cursor.sfqid,
}
)
return {"status": "created"}
Fetch Last Updated Timestamps
Track when Snowflake tables were last modified:
from dagster import sensor, SensorEvaluationContext, RunRequest
from dagster_snowflake import (
SnowflakeResource,
fetch_last_updated_timestamps,
)
@sensor(job_name="process_updated_tables")
def snowflake_table_sensor(
context: SensorEvaluationContext,
snowflake: SnowflakeResource,
):
# Fetch last updated times for tables
timestamps = fetch_last_updated_timestamps(
snowflake=snowflake,
schema="raw",
tables=["orders", "customers"],
)
# Trigger runs for recently updated tables
for table, timestamp in timestamps.items():
cursor = context.cursor or "2024-01-01"
if timestamp > cursor:
yield RunRequest(
run_key=f"{table}_{timestamp}",
run_config={"ops": {"config": {"table": table}}},
)
context.update_cursor(str(timestamp))
Best Practices
- Use warehouses efficiently: Assign appropriate warehouse sizes for workload
- Connection pooling: Reuse connections with
get_connection() context manager
- Secure credentials: Always use environment variables for passwords
- Schema organization: Use separate schemas for raw, staging, and production data
- Query optimization: Use Snowflake query profiling to optimize expensive queries
- Cost monitoring: Track warehouse usage and query costs
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("ALTER SESSION SET QUERY_TAG = 'dagster_pipeline'")
cursor.execute("SELECT * FROM large_table")
Enable Result Caching
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("ALTER SESSION SET USE_CACHED_RESULT = TRUE")
Optimize Large Loads
@asset
def bulk_load(snowflake: SnowflakeResource):
# Use COPY INTO for large data loads
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
COPY INTO analytics.large_table
FROM @my_stage/data.csv
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',')
""")
Troubleshooting
Connection timeout
Increase timeout in connection parameters:
snowflake = SnowflakeResource(
account="my-account",
user="my-user",
password={"env": "SNOWFLAKE_PASSWORD"},
database="analytics",
warehouse="compute_wh",
connect_timeout=30, # seconds
)
Warehouse suspended
Ensure warehouse is running:
ALTER WAREHOUSE compute_wh RESUME IF SUSPENDED;
Authentication errors
Verify account identifier format:
- Include region:
xy12345.us-east-1
- For AWS:
account.region
- For Azure:
account.region.azure
- For GCP:
account.region.gcp
API Reference
Key components:
SnowflakeResource: Connection resource for Snowflake
snowflake_op_for_query: Create ops from SQL queries
SnowflakePandasIOManager: Store DataFrames as Snowflake tables
SnowflakePySparkIOManager: Store Spark DataFrames in Snowflake
fetch_last_updated_timestamps: Query table modification times
build_snowflake_io_manager: Build custom IO managers
For complete documentation, see dagster-snowflake API reference.
Next Steps