Transform your loaded data using dbt, SQL queries, or Python dataframes. dlt provides multiple approaches to transform data depending on your use case and preferences.
The dbt runner can create a virtual environment for dbt on the fly, run dbt packages from Git or local files, and automatically pass credentials from your dlt pipeline.
Create an isolated virtual environment with dbt installed:
# Create or restore venv for dbtvenv = dlt.dbt.get_venv(transform_pipeline, dbt_version="1.5.0")
If you already have dbt installed in your current environment, you can skip the venv creation and omit the venv parameter in the next step.
4
Run dbt transformations
Get a dbt runner and execute your transformations:
# Get dbt runner from local packagedbt_runner = dlt.dbt.package( transform_pipeline, "pipedrive/dbt_pipedrive", # Local path or Git URL venv=venv)# Run all dbt modelsmodels = dbt_runner.run_all()# Print resultsfor m in models: print( f"Model {m.model_name} materialized " f"in {m.time}s with status {m.status}" )
Use dlt’s SQL client to run custom transformations directly:
import dltpipeline = dlt.pipeline( pipeline_name='analytics', destination='postgres', dataset_name='raw_data')# Load data firstpipeline.run(my_source())# Get SQL clientwith pipeline.sql_client() as client: # Create a transformed table client.execute_sql(""" CREATE TABLE analytics.user_summary AS SELECT user_id, COUNT(*) as event_count, MAX(created_at) as last_event_at, MIN(created_at) as first_event_at FROM raw_data.events GROUP BY user_id """) # Read results with client.execute_query("SELECT * FROM analytics.user_summary LIMIT 10") as cursor: for row in cursor.fetchall(): print(row)