Skip to main content

PostgreSQL

PostgreSQL is a powerful, open-source object-relational database system with a strong reputation for reliability, feature robustness, and performance.

Install dlt with PostgreSQL

To use PostgreSQL as a destination, install dlt with the PostgreSQL extra:
pip install "dlt[postgres]"

Quick Start

Here’s a simple example to get you started:
import dlt

# Define your data source
@dlt.resource
def my_data():
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

# Create pipeline
pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="postgres",
    dataset_name="my_dataset"
)

# Run the pipeline
info = pipeline.run(my_data())
print(info)

Configuration

Basic Configuration

host
string
required
The PostgreSQL server hostname or IP address
database
string
required
The database name
username
string
required
The database username
password
string
required
The database password
port
int
default:"5432"
The PostgreSQL port number

Advanced Configuration

connect_timeout
int
default:"15"
Connection timeout in seconds
client_encoding
string
Client encoding for text data (e.g., “utf-8”)
create_indexes
bool
default:"true"
Whether to create indexes on primary keys and unique columns

Authentication

Username and Password

Create a .dlt/secrets.toml file with your credentials:
[destination.postgres.credentials]
host = "localhost"
port = 5432
database = "my_database"
username = "my_user"
password = "my_password"
1

Install PostgreSQL

Download and install PostgreSQL from postgresql.org
2

Create a Database

CREATE DATABASE my_dlt_database;
3

Create a User and Grant Permissions

CREATE USER dlt_user WITH PASSWORD 'your-secure-password';
GRANT ALL PRIVILEGES ON DATABASE my_dlt_database TO dlt_user;

-- Connect to the database first
\c my_dlt_database

GRANT ALL ON SCHEMA public TO dlt_user;

Connection String

You can also use a connection string:
import dlt

pipeline = dlt.pipeline(
    destination="postgresql://user:password@localhost:5432/database",
    dataset_name="my_dataset"
)
Or in secrets.toml:
[destination]
postgres = "postgresql://user:password@localhost:5432/database"

Data Loading

PostgreSQL supports efficient data loading using COPY commands.

Supported File Formats

pipeline = dlt.pipeline(
    destination="postgres",
    dataset_name="my_dataset"
)
# JSONL is the default format

CSV Format Configuration

Customize CSV loading options:
import dlt
from dlt.destinations import postgres
from dlt.common.destination.configuration import CsvFormatConfiguration

csv_config = CsvFormatConfiguration(
    delimiter="|",
    include_header=True
)

pipeline = dlt.pipeline(
    destination=postgres(csv_format=csv_config),
    dataset_name="my_dataset"
)

Write Dispositions

PostgreSQL supports all write dispositions:
@dlt.resource(write_disposition="append")
def append_data():
    yield {"id": 1, "value": "new"}

Data Types

PostgreSQL data type mapping:
dlt TypePostgreSQL Type
textVARCHAR
doubleDOUBLE PRECISION
boolBOOLEAN
timestampTIMESTAMP WITH TIME ZONE
dateDATE
timeTIME WITHOUT TIME ZONE
bigintBIGINT
binaryBYTEA
decimalNUMERIC
jsonJSONB

JSONB Support

PostgreSQL’s JSONB type allows efficient storage and querying of JSON data:
import dlt

@dlt.resource
def json_data():
    yield {
        "id": 1,
        "metadata": {"nested": "value", "tags": ["a", "b"]}
    }

pipeline = dlt.pipeline(destination="postgres", dataset_name="my_dataset")
pipeline.run(json_data())
Query JSONB data:
SELECT metadata->>'nested' as nested_value
FROM json_data
WHERE metadata @> '{"tags": ["a"]}';

Timestamp Precision

PostgreSQL supports timestamps with precision up to 6:
import dlt

@dlt.resource
def precise_timestamps():
    yield {
        "timestamp_col": "2024-01-01T12:00:00.123456Z"
    }

pipeline = dlt.pipeline(destination="postgres", dataset_name="my_dataset")
pipeline.run(precise_timestamps())

Indexes and Constraints

By default, dlt creates indexes on primary keys and unique columns:
import dlt

@dlt.resource(primary_key="id")
def indexed_data():
    yield {"id": 1, "value": "data"}

pipeline = dlt.pipeline(destination="postgres", dataset_name="my_dataset")
pipeline.run(indexed_data())
Disable index creation:
import dlt
from dlt.destinations import postgres

pipeline = dlt.pipeline(
    destination=postgres(create_indexes=False),
    dataset_name="my_dataset"
)

Advanced Features

Geometry Support

PostgreSQL with PostGIS extension supports geometry data:
import dlt
from dlt.destinations.adapters import postgres_adapter

@dlt.resource
def locations():
    yield {
        "id": 1,
        "location": "POINT(-122.4194 37.7749)"
    }

pipeline = dlt.pipeline(destination="postgres", dataset_name="my_dataset")
pipeline.run(
    postgres_adapter(
        locations(),
        columns={"location": {"geometry": "POINT", "srid": 4326}}
    )
)

Transaction Support

PostgreSQL supports DDL transactions, so schema changes are atomic:
import dlt

@dlt.resource
def my_data():
    yield {"id": 1, "value": "data"}

pipeline = dlt.pipeline(destination="postgres", dataset_name="my_dataset")
# All schema changes happen in a transaction
pipeline.run(my_data())

Client Encoding

Set the client encoding:
[destination.postgres.credentials]
host = "localhost"
database = "my_database"
username = "my_user"
password = "my_password"
client_encoding = "utf-8"

Case Sensitivity

PostgreSQL is case-sensitive when identifiers are quoted. The default snake_case naming convention in dlt creates lowercase identifiers:
# No additional configuration needed for default behavior

Performance Optimization

Batch Size

For large datasets, adjust the batch size:
import dlt

@dlt.resource
def large_dataset():
    for i in range(1000000):
        yield {"id": i, "value": f"data_{i}"}

pipeline = dlt.pipeline(destination="postgres", dataset_name="my_dataset")
pipeline.run(large_dataset())

Connection Pooling

PostgreSQL uses connection pooling by default. Adjust timeout for slow connections:
[destination.postgres.credentials]
host = "localhost"
database = "my_database"
username = "my_user"
password = "my_password"
connect_timeout = 30

Staging Support

For large datasets, use staging:
import dlt

pipeline = dlt.pipeline(
    destination="postgres",
    staging="filesystem",
    dataset_name="my_dataset"
)

Cloud PostgreSQL

AWS RDS

Connect to AWS RDS PostgreSQL:
[destination.postgres.credentials]
host = "mydb.abc123.us-west-2.rds.amazonaws.com"
port = 5432
database = "mydb"
username = "admin"
password = "your-password"

Google Cloud SQL

Connect to Cloud SQL PostgreSQL:
[destination.postgres.credentials]
host = "/cloudsql/project:region:instance"
database = "mydb"
username = "postgres"
password = "your-password"

Azure Database for PostgreSQL

Connect to Azure PostgreSQL:
[destination.postgres.credentials]
host = "myserver.postgres.database.azure.com"
port = 5432
database = "mydb"
username = "admin@myserver"
password = "your-password"

Additional Resources

PostgreSQL Docs

Official PostgreSQL documentation

PostGIS Extension

Spatial database extender for PostgreSQL

Build docs developers (and LLMs) love