Amazon Redshift
Amazon Redshift is a fast, fully managed cloud data warehouse that makes it simple and cost-effective to analyze data using standard SQL and your existing business intelligence tools.
Install dlt with Redshift
To use Redshift as a destination, install dlt with the Redshift extra:
pip install "dlt[redshift]"
Quick Start
Here’s a simple example to get you started:
import dlt
# Define your data source
@dlt.resource
def my_data ():
yield { "id" : 1 , "name" : "Alice" }
yield { "id" : 2 , "name" : "Bob" }
# Create pipeline
pipeline = dlt.pipeline(
pipeline_name = "my_pipeline" ,
destination = "redshift" ,
dataset_name = "my_dataset"
)
# Run the pipeline
info = pipeline.run(my_data())
print (info)
Configuration
Basic Configuration
The Redshift cluster endpoint (e.g., my-cluster.abc123.us-west-2.redshift.amazonaws.com)
Advanced Configuration
IAM role ARN for COPY command when using S3 staging
has_case_sensitive_identifiers
Redshift uses case-insensitive identifiers by default
Whether to create indexes on primary keys
Authentication
Username and Password
Create a .dlt/secrets.toml file with your credentials:
[ destination . redshift . credentials ]
host = "my-cluster.abc123.us-west-2.redshift.amazonaws.com"
port = 5439
database = "my_database"
username = "my_user"
password = "my_password"
Create a Redshift Cluster
Configure Security Groups
Ensure your security group allows inbound traffic on port 5439 from your IP address.
Create a Database
CREATE DATABASE my_dlt_database ;
Create a User and Grant Permissions
CREATE USER dlt_user WITH PASSWORD 'your-secure-password' ;
GRANT CREATE ON DATABASE my_dlt_database TO dlt_user;
GRANT ALL ON SCHEMA public TO dlt_user;
Connection String
You can also use a connection string:
import dlt
pipeline = dlt.pipeline(
destination = "redshift://user:password@host:5439/database" ,
dataset_name = "my_dataset"
)
Data Loading
Redshift is based on PostgreSQL and uses the same loading mechanism, but with optimizations for columnar storage.
JSONL (Default)
Parquet
CSV
pipeline = dlt.pipeline(
destination = "redshift" ,
dataset_name = "my_dataset"
)
# JSONL is the default format
Staging with S3
For optimal performance with large datasets, use S3 staging:
import dlt
pipeline = dlt.pipeline(
destination = "redshift" ,
staging = "filesystem" , # Use S3 for staging
dataset_name = "my_dataset"
)
Configure S3 credentials in .dlt/secrets.toml:
[ destination . filesystem ]
bucket_url = "s3://my-bucket/staging"
[ destination . filesystem . credentials ]
aws_access_key_id = "your-access-key"
aws_secret_access_key = "your-secret-key"
region_name = "us-west-2"
IAM Role for COPY
When using S3 staging, you can use an IAM role instead of credentials:
[ destination . redshift ]
staging_iam_role = "arn:aws:iam::123456789012:role/MyRedshiftRole"
[ destination . filesystem ]
bucket_url = "s3://my-bucket/staging"
# No credentials needed when using IAM role
Create an IAM Role
Create an IAM role with permissions to access your S3 bucket.
Attach Policy to Role
{
"Version" : "2012-10-17" ,
"Statement" : [
{
"Effect" : "Allow" ,
"Action" : [ "s3:GetObject" , "s3:ListBucket" ],
"Resource" : [
"arn:aws:s3:::my-bucket/*" ,
"arn:aws:s3:::my-bucket"
]
}
]
}
Attach Role to Cluster
In the Redshift console, attach the IAM role to your cluster.
Use Role ARN in Configuration
Copy the role ARN to your secrets.toml file.
Write Dispositions
Redshift supports all write dispositions:
@dlt.resource ( write_disposition = "append" )
def append_data ():
yield { "id" : 1 , "value" : "new" }
Data Types
Redshift data type mapping (based on PostgreSQL):
dlt Type Redshift Type text VARCHAR double DOUBLE PRECISION bool BOOLEAN timestamp TIMESTAMP WITH TIME ZONE date DATE time TIME bigint BIGINT binary VARBINARY decimal NUMERIC json SUPER
SUPER Data Type
Redshift’s SUPER type can store semi-structured data like JSON:
import dlt
@dlt.resource
def json_data ():
yield {
"id" : 1 ,
"metadata" : { "nested" : "value" , "count" : 42 }
}
pipeline = dlt.pipeline( destination = "redshift" , dataset_name = "my_dataset" )
pipeline.run(json_data())
Distribution Keys
Optimize query performance by distributing data across nodes:
import dlt
@dlt.resource
def distributed_data ():
yield { "user_id" : 1 , "value" : "data" }
# Use primary_key to influence distribution
distributed_data.apply_hints( primary_key = "user_id" )
Sort Keys
Improve query performance with sort keys:
import dlt
@dlt.resource
def sorted_data ():
yield { "timestamp" : "2024-01-01T00:00:00Z" , "value" : 100 }
# Redshift uses primary keys as sort keys by default
sorted_data.apply_hints( primary_key = "timestamp" )
Compression
Redshift automatically applies compression encoding. Use Parquet for better compression:
import dlt
pipeline = dlt.pipeline(
destination = "redshift" ,
staging = "filesystem" ,
dataset_name = "my_dataset"
)
pipeline.run(my_data(), loader_file_format = "parquet" )
Case Sensitivity
Redshift uses case-insensitive identifiers by default:
[ destination . redshift ]
has_case_sensitive_identifiers = false
All identifiers are automatically lowercased unless quoted.
Advanced Configuration
Client Encoding
Set the client encoding for text data:
[ destination . redshift . credentials ]
host = "my-cluster.abc123.us-west-2.redshift.amazonaws.com"
database = "my_database"
username = "my_user"
password = "my_password"
client_encoding = "utf-8"
Connection Timeout
Configure connection timeout:
[ destination . redshift . credentials ]
host = "my-cluster.abc123.us-west-2.redshift.amazonaws.com"
database = "my_database"
username = "my_user"
password = "my_password"
connect_timeout = 30
Limitations
Redshift has a maximum of 1,600 columns per table
VARCHAR columns are limited to 65,535 bytes
Nested data structures require SUPER type or flattening
Case-insensitive identifiers may cause conflicts
Additional Resources
Redshift Docs Official Amazon Redshift documentation
PostgreSQL Destination Redshift is based on PostgreSQL