Skip to main content

Data Transformations

Transform your loaded data using dbt, SQL queries, or Python dataframes. dlt provides multiple approaches to transform data depending on your use case and preferences.

dbt Integration

dlt has native integration with dbt, allowing you to run dbt transformations directly from your pipeline code.

Why use dbt with dlt?

  • End-to-end cross-database compatibility - dlt handles credentials and connection setup
  • Version isolation - Run dbt in a separate virtual environment to avoid dependency conflicts
  • Simplified workflows - No need to manage dbt profiles or credentials separately
  • Testing support - Run dbt tests and validate data quality automatically

Basic dbt Runner Usage

The dbt runner can create a virtual environment for dbt on the fly, run dbt packages from Git or local files, and automatically pass credentials from your dlt pipeline.
1

Run your pipeline

First, run your dlt pipeline to load data:
import dlt
from pipedrive import pipedrive_source

# Load raw data
pipeline = dlt.pipeline(
    pipeline_name='pipedrive',
    destination='bigquery',
    dataset_name='pipedrive_raw'
)

load_info = pipeline.run(pipedrive_source())
print(load_info)
2

Set up dbt transformations

Create a new pipeline instance pointing to your transformed dataset:
# Create transformation pipeline
transform_pipeline = dlt.pipeline(
    pipeline_name='pipedrive',
    destination='bigquery',
    dataset_name='pipedrive_transformed'
)
3

Create dbt virtual environment

Create an isolated virtual environment with dbt installed:
# Create or restore venv for dbt
venv = dlt.dbt.get_venv(transform_pipeline, dbt_version="1.5.0")
If you already have dbt installed in your current environment, you can skip the venv creation and omit the venv parameter in the next step.
4

Run dbt transformations

Get a dbt runner and execute your transformations:
# Get dbt runner from local package
dbt_runner = dlt.dbt.package(
    transform_pipeline,
    "pipedrive/dbt_pipedrive",  # Local path or Git URL
    venv=venv
)

# Run all dbt models
models = dbt_runner.run_all()

# Print results
for m in models:
    print(
        f"Model {m.model_name} materialized "
        f"in {m.time}s with status {m.status}"
    )

Running dbt from Git Repository

You can run dbt packages directly from Git repositories:
import dlt

pipeline = dlt.pipeline(
    pipeline_name='jaffle_shop',
    destination='duckdb',
    dataset_name='jaffle'
)

# Run dbt from GitHub
venv = dlt.dbt.get_venv(pipeline)
dbt = dlt.dbt.package(
    pipeline,
    "https://github.com/dbt-labs/jaffle_shop.git",
    venv=venv
)

# Run all steps: deps, seed, run, test
models = dbt.run_all()
print(f"Executed {len(models)} models")

# Run tests separately
tests = dbt.test()
print(f"Passed {len(tests)} tests")

Advanced dbt Configuration

# Run only specific models
models = dbt_runner.run(
    models=["model_1", "model_2"],
    exclude=["staging.*"]
)

Running dbt Without a Pipeline

You can use the dbt runner standalone without a dlt pipeline:
from dlt.helpers.dbt import package_runner, Venv
from dlt.common.destination.client import DestinationClientDwhConfiguration

# Create a simple configuration
class DuckDBConfig(DestinationClientDwhConfiguration):
    destination_type = "duckdb"
    credentials = "duckdb://my_database.duckdb"
    dataset_name = "analytics"

# Use current environment (assumes dbt is installed)
venv = Venv.restore_current()

# Run dbt package
dbt = package_runner(
    venv=venv,
    destination_config=DuckDBConfig(),
    working_dir=".",
    package_location="https://github.com/dbt-labs/jaffle_shop.git"
)

models = dbt.run_all()
print(f"Completed {len(models)} transformations")

SQL Transformations

Use dlt’s SQL client to run custom transformations directly:
import dlt

pipeline = dlt.pipeline(
    pipeline_name='analytics',
    destination='postgres',
    dataset_name='raw_data'
)

# Load data first
pipeline.run(my_source())

# Get SQL client
with pipeline.sql_client() as client:
    # Create a transformed table
    client.execute_sql("""
        CREATE TABLE analytics.user_summary AS
        SELECT 
            user_id,
            COUNT(*) as event_count,
            MAX(created_at) as last_event_at,
            MIN(created_at) as first_event_at
        FROM raw_data.events
        GROUP BY user_id
    """)
    
    # Read results
    with client.execute_query("SELECT * FROM analytics.user_summary LIMIT 10") as cursor:
        for row in cursor.fetchall():
            print(row)

Python Transformations

Transform data using pandas or other Python libraries:
import dlt
import pandas as pd

pipeline = dlt.pipeline(
    pipeline_name='analytics',
    destination='duckdb',
    dataset_name='transformed'
)

# Load raw data
pipeline.run(raw_source())

# Read data as DataFrame
with pipeline.sql_client() as client:
    df = client.execute_sql(
        "SELECT * FROM raw_data.events"
    ).df()

# Transform with pandas
df['event_date'] = pd.to_datetime(df['created_at']).dt.date
summary = df.groupby(['user_id', 'event_date']).agg({
    'event_id': 'count',
    'revenue': 'sum'
}).reset_index()

# Load transformed data back
pipeline.run(
    summary,
    table_name='daily_user_metrics',
    write_disposition='replace'
)

Inline Transformations

Transform data during extraction using @dlt.transformer:
import dlt

@dlt.resource
def users():
    """Load raw user data"""
    yield from fetch_users_from_api()

@dlt.transformer(data_from=users)
def active_users(user):
    """Filter and transform users"""
    if user['status'] == 'active':
        # Add computed fields
        user['full_name'] = f"{user['first_name']} {user['last_name']}"
        user['days_active'] = (datetime.now() - user['created_at']).days
        yield user

@dlt.source
def user_source():
    return active_users

# Only active users with computed fields will be loaded
pipeline = dlt.pipeline(
    pipeline_name='users',
    destination='snowflake',
    dataset_name='analytics'
)

pipeline.run(user_source())

Best Practices

Choose the right transformation approach:
  • Use dbt for complex, version-controlled SQL transformations
  • Use SQL client for simple ad-hoc transformations
  • Use Python when you need advanced data manipulation or ML features
  • Use inline transformations for filtering or enriching data during extraction
When using dbt with dlt, make sure your dbt models reference the correct source datasets that dlt loads into. Update your dbt sources.yml accordingly.

Build docs developers (and LLMs) love