Skip to main content

Snowflake

Snowflake is a cloud-native data platform that provides data warehousing, data lake, data engineering, data science, and data application development capabilities.

Install dlt with Snowflake

To use Snowflake as a destination, install dlt with the Snowflake extra:
pip install "dlt[snowflake]"

Quick Start

Here’s a simple example to get you started:
import dlt

# Define your data source
@dlt.resource
def my_data():
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

# Create pipeline
pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="snowflake",
    dataset_name="my_dataset"
)

# Run the pipeline
info = pipeline.run(my_data())
print(info)

Configuration

Basic Configuration

database
string
required
The Snowflake database name
warehouse
string
The Snowflake warehouse to use for queries
role
string
The Snowflake role to use
stage_name
string
Use an existing named stage instead of the default table stage
keep_staged_files
bool
default:"true"
Whether to keep or delete staged files after COPY INTO succeeds

Advanced Configuration

create_indexes
bool
default:"false"
Whether UNIQUE or PRIMARY KEY constraints should be created
use_vectorized_scanner
bool
default:"false"
Whether to use the vectorized scanner in COPY INTO
enable_atomic_swap
bool
default:"false"
When enabled with staging-optimized replace strategy, tables are atomically swapped using ALTER TABLE … SWAP WITH
use_decfloat
bool
default:"false"
Whether to use DECFLOAT type for unbound decimals instead of DECIMAL
query_tag
string
A tag with placeholders to tag sessions executing jobs

Authentication

Username and Password

Create a .dlt/secrets.toml file with your credentials:
[destination.snowflake.credentials]
host = "your-account.snowflakecomputing.com"
username = "your-username"
password = "your-password"
database = "your-database"
warehouse = "your-warehouse"
role = "your-role"
1

Get Your Account Identifier

Your Snowflake account identifier is typically in the format: organization-account_nameExample: abc12345.us-east-1
2

Create a Database

CREATE DATABASE my_dlt_database;
3

Create a Warehouse

CREATE WAREHOUSE my_dlt_warehouse
WITH WAREHOUSE_SIZE = 'XSMALL'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE;
4

Create a User and Grant Permissions

CREATE USER dlt_user PASSWORD = 'your-secure-password';

GRANT USAGE ON DATABASE my_dlt_database TO ROLE PUBLIC;
GRANT USAGE ON WAREHOUSE my_dlt_warehouse TO ROLE PUBLIC;
GRANT CREATE SCHEMA ON DATABASE my_dlt_database TO ROLE PUBLIC;

Private Key Authentication

For enhanced security, use private key authentication:
[destination.snowflake.credentials]
host = "your-account.snowflakecomputing.com"
username = "your-username"
database = "your-database"
warehouse = "your-warehouse"
role = "your-role"
private_key = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
private_key_passphrase = "your-passphrase"  # Optional
Or reference a private key file:
[destination.snowflake.credentials]
host = "your-account.snowflakecomputing.com"
username = "your-username"
database = "your-database"
private_key_path = "/path/to/private_key.pem"

OAuth Authentication

Snowflake supports OAuth authentication:
[destination.snowflake.credentials]
host = "your-account.snowflakecomputing.com"
username = "your-username"
database = "your-database"
authenticator = "oauth"
token = "your-oauth-token"

Data Loading

dlt loads data to Snowflake using the COPY INTO command with staged files.

Supported File Formats

pipeline = dlt.pipeline(
    destination="snowflake",
    dataset_name="my_dataset"
)
# JSONL is the default format

Staging

Snowflake uses stages to load data. By default, dlt uses table stages, but you can configure external stages:
import dlt
from dlt.destinations import snowflake

pipeline = dlt.pipeline(
    destination=snowflake(stage_name="my_named_stage"),
    dataset_name="my_dataset"
)
For external staging with S3:
import dlt

pipeline = dlt.pipeline(
    destination="snowflake",
    staging="filesystem",  # Use S3 for staging
    dataset_name="my_dataset"
)
Configure the staging bucket:
[destination.filesystem]
bucket_url = "s3://my-bucket/staging"

[destination.filesystem.credentials]
aws_access_key_id = "your-access-key"
aws_secret_access_key = "your-secret-key"

Write Dispositions

Snowflake supports all write dispositions:
@dlt.resource(write_disposition="append")
def append_data():
    yield {"id": 1, "value": "new"}

Atomic Table Swaps

With enable_atomic_swap=True and staging-optimized replace strategy, Snowflake uses ALTER TABLE ... SWAP WITH for zero-downtime loading:
import dlt
from dlt.destinations import snowflake

pipeline = dlt.pipeline(
    destination=snowflake(enable_atomic_swap=True),
    dataset_name="my_dataset"
)

@dlt.resource(write_disposition="replace")
def my_data():
    yield {"id": 1, "value": "data"}

pipeline.run(my_data())

Advanced Features

CSV Format Configuration

Customize CSV format options:
import dlt
from dlt.destinations import snowflake
from dlt.common.destination.configuration import CsvFormatConfiguration

csv_config = CsvFormatConfiguration(
    delimiter="|",
    include_header=True
)

pipeline = dlt.pipeline(
    destination=snowflake(csv_format=csv_config),
    dataset_name="my_dataset"
)

Query Tags

Tag your Snowflake queries for tracking:
import dlt
from dlt.destinations import snowflake

pipeline = dlt.pipeline(
    destination=snowflake(
        query_tag="dlt_pipeline={pipeline_name},dataset={dataset_name}"
    ),
    dataset_name="my_dataset"
)

DECFLOAT for Decimals

Use DECFLOAT type for unbound decimals:
import dlt
from dlt.destinations import snowflake

pipeline = dlt.pipeline(
    destination=snowflake(use_decfloat=True),
    dataset_name="my_dataset"
)
DECFLOAT only works with text-based formats (JSONL, CSV), not Parquet.

Vectorized Scanner

Enable the vectorized scanner for better performance:
import dlt
from dlt.destinations import snowflake

pipeline = dlt.pipeline(
    destination=snowflake(use_vectorized_scanner=True),
    dataset_name="my_dataset"
)

Data Types

Snowflake data type mapping:
dlt TypeSnowflake Type
textVARCHAR
doubleFLOAT
boolBOOLEAN
timestampTIMESTAMP_TZ
dateDATE
timeTIME
bigintNUMBER(19,0)
binaryBINARY
decimalNUMBER
jsonVARIANT

Timestamp Precision

Snowflake supports timestamps with precision up to 9:
import dlt

@dlt.resource
def precise_timestamps():
    yield {
        "timestamp_col": "2024-01-01T12:00:00.123456789Z"
    }

pipeline = dlt.pipeline(destination="snowflake", dataset_name="my_dataset")
pipeline.run(precise_timestamps())

Case Sensitivity

Snowflake is case-sensitive but uppercases unquoted identifiers by default. dlt uses the snake_case naming convention which creates uppercase identifiers:
[destination.snowflake]
# No additional configuration needed for default behavior
For case-sensitive identifiers, use a different naming convention:
[sources]
naming = "sql_ci_v1"  # Case-insensitive naming

Connection String

You can also use a connection string:
import dlt

pipeline = dlt.pipeline(
    destination="snowflake://user:password@account/database?warehouse=wh&role=role",
    dataset_name="my_dataset"
)

Additional Resources

Snowflake Docs

Official Snowflake documentation

Staging Guide

Learn about staging with filesystem destination

Build docs developers (and LLMs) love