Snowflake
Snowflake is a cloud-native data platform that provides data warehousing, data lake, data engineering, data science, and data application development capabilities.
Install dlt with Snowflake
To use Snowflake as a destination, install dlt with the Snowflake extra:
pip install "dlt[snowflake]"
Quick Start
Here’s a simple example to get you started:
import dlt
# Define your data source
@dlt.resource
def my_data ():
yield { "id" : 1 , "name" : "Alice" }
yield { "id" : 2 , "name" : "Bob" }
# Create pipeline
pipeline = dlt.pipeline(
pipeline_name = "my_pipeline" ,
destination = "snowflake" ,
dataset_name = "my_dataset"
)
# Run the pipeline
info = pipeline.run(my_data())
print (info)
Configuration
Basic Configuration
The Snowflake database name
The Snowflake warehouse to use for queries
The Snowflake role to use
Use an existing named stage instead of the default table stage
Whether to keep or delete staged files after COPY INTO succeeds
Advanced Configuration
Whether UNIQUE or PRIMARY KEY constraints should be created
Whether to use the vectorized scanner in COPY INTO
When enabled with staging-optimized replace strategy, tables are atomically swapped using ALTER TABLE … SWAP WITH
Whether to use DECFLOAT type for unbound decimals instead of DECIMAL
A tag with placeholders to tag sessions executing jobs
Authentication
Username and Password
Create a .dlt/secrets.toml file with your credentials:
[ destination . snowflake . credentials ]
host = "your-account.snowflakecomputing.com"
username = "your-username"
password = "your-password"
database = "your-database"
warehouse = "your-warehouse"
role = "your-role"
Get Your Account Identifier
Your Snowflake account identifier is typically in the format: organization-account_name Example: abc12345.us-east-1
Create a Database
CREATE DATABASE my_dlt_database ;
Create a Warehouse
CREATE WAREHOUSE my_dlt_warehouse
WITH WAREHOUSE_SIZE = 'XSMALL'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE;
Create a User and Grant Permissions
CREATE USER dlt_user PASSWORD = 'your-secure-password' ;
GRANT USAGE ON DATABASE my_dlt_database TO ROLE PUBLIC;
GRANT USAGE ON WAREHOUSE my_dlt_warehouse TO ROLE PUBLIC;
GRANT CREATE SCHEMA ON DATABASE my_dlt_database TO ROLE PUBLIC;
Private Key Authentication
For enhanced security, use private key authentication:
[ destination . snowflake . credentials ]
host = "your-account.snowflakecomputing.com"
username = "your-username"
database = "your-database"
warehouse = "your-warehouse"
role = "your-role"
private_key = "-----BEGIN PRIVATE KEY----- \n ... \n -----END PRIVATE KEY----- \n "
private_key_passphrase = "your-passphrase" # Optional
Or reference a private key file:
[ destination . snowflake . credentials ]
host = "your-account.snowflakecomputing.com"
username = "your-username"
database = "your-database"
private_key_path = "/path/to/private_key.pem"
OAuth Authentication
Snowflake supports OAuth authentication:
[ destination . snowflake . credentials ]
host = "your-account.snowflakecomputing.com"
username = "your-username"
database = "your-database"
authenticator = "oauth"
token = "your-oauth-token"
Data Loading
dlt loads data to Snowflake using the COPY INTO command with staged files.
JSONL (Default)
Parquet
CSV
pipeline = dlt.pipeline(
destination = "snowflake" ,
dataset_name = "my_dataset"
)
# JSONL is the default format
Staging
Snowflake uses stages to load data. By default, dlt uses table stages, but you can configure external stages:
import dlt
from dlt.destinations import snowflake
pipeline = dlt.pipeline(
destination = snowflake( stage_name = "my_named_stage" ),
dataset_name = "my_dataset"
)
For external staging with S3:
import dlt
pipeline = dlt.pipeline(
destination = "snowflake" ,
staging = "filesystem" , # Use S3 for staging
dataset_name = "my_dataset"
)
Configure the staging bucket:
[ destination . filesystem ]
bucket_url = "s3://my-bucket/staging"
[ destination . filesystem . credentials ]
aws_access_key_id = "your-access-key"
aws_secret_access_key = "your-secret-key"
Write Dispositions
Snowflake supports all write dispositions:
@dlt.resource ( write_disposition = "append" )
def append_data ():
yield { "id" : 1 , "value" : "new" }
Atomic Table Swaps
With enable_atomic_swap=True and staging-optimized replace strategy, Snowflake uses ALTER TABLE ... SWAP WITH for zero-downtime loading:
import dlt
from dlt.destinations import snowflake
pipeline = dlt.pipeline(
destination = snowflake( enable_atomic_swap = True ),
dataset_name = "my_dataset"
)
@dlt.resource ( write_disposition = "replace" )
def my_data ():
yield { "id" : 1 , "value" : "data" }
pipeline.run(my_data())
Advanced Features
Customize CSV format options:
import dlt
from dlt.destinations import snowflake
from dlt.common.destination.configuration import CsvFormatConfiguration
csv_config = CsvFormatConfiguration(
delimiter = "|" ,
include_header = True
)
pipeline = dlt.pipeline(
destination = snowflake( csv_format = csv_config),
dataset_name = "my_dataset"
)
Tag your Snowflake queries for tracking:
import dlt
from dlt.destinations import snowflake
pipeline = dlt.pipeline(
destination = snowflake(
query_tag = "dlt_pipeline= {pipeline_name} ,dataset= {dataset_name} "
),
dataset_name = "my_dataset"
)
DECFLOAT for Decimals
Use DECFLOAT type for unbound decimals:
import dlt
from dlt.destinations import snowflake
pipeline = dlt.pipeline(
destination = snowflake( use_decfloat = True ),
dataset_name = "my_dataset"
)
DECFLOAT only works with text-based formats (JSONL, CSV), not Parquet.
Vectorized Scanner
Enable the vectorized scanner for better performance:
import dlt
from dlt.destinations import snowflake
pipeline = dlt.pipeline(
destination = snowflake( use_vectorized_scanner = True ),
dataset_name = "my_dataset"
)
Data Types
Snowflake data type mapping:
dlt Type Snowflake Type text VARCHAR double FLOAT bool BOOLEAN timestamp TIMESTAMP_TZ date DATE time TIME bigint NUMBER(19,0) binary BINARY decimal NUMBER json VARIANT
Timestamp Precision
Snowflake supports timestamps with precision up to 9:
import dlt
@dlt.resource
def precise_timestamps ():
yield {
"timestamp_col" : "2024-01-01T12:00:00.123456789Z"
}
pipeline = dlt.pipeline( destination = "snowflake" , dataset_name = "my_dataset" )
pipeline.run(precise_timestamps())
Case Sensitivity
Snowflake is case-sensitive but uppercases unquoted identifiers by default. dlt uses the snake_case naming convention which creates uppercase identifiers:
[ destination . snowflake ]
# No additional configuration needed for default behavior
For case-sensitive identifiers, use a different naming convention:
[ sources ]
naming = "sql_ci_v1" # Case-insensitive naming
Connection String
You can also use a connection string:
import dlt
pipeline = dlt.pipeline(
destination = "snowflake://user:password@account/database?warehouse=wh&role=role" ,
dataset_name = "my_dataset"
)
Additional Resources
Snowflake Docs Official Snowflake documentation
Staging Guide Learn about staging with filesystem destination