Schema evolution enables dlt to automatically detect and handle changes in your data structure without breaking downstream systems. This includes new columns, type changes, and nested data modifications.
How It Works
dlt automatically infers schema on the first pipeline run and adapts to changes over time:
Initial Schema Inference
On first run, dlt scans your data and generates a schema: import dlt
data = [{
"organization" : "Tech Innovations Inc." ,
"address" : {
"building" : "r&d" ,
"room" : 7890 ,
},
"inventory" : [
{ "name" : "Plasma ray" , "inventory_nr" : 2411 },
{ "name" : "Self-aware Roomba" , "inventory_nr" : 268 },
]
}]
pipeline = dlt.pipeline(
pipeline_name = "organizations" ,
destination = "duckdb" ,
dataset_name = "org_data"
)
# Creates tables: org, org__inventory
pipeline.run(data, table_name = "org" )
Automatic Evolution
On subsequent runs, dlt adapts to schema changes: # Updated data with schema changes
data = [{
"organization" : "Tech Innovations Inc." ,
"ceo" : "Alice Smith" , # NEW: Column added
"address" : {
"main_block" : "r&d" , # RENAMED: building -> main_block
# REMOVED: room field
},
"inventory" : [
# TYPE CHANGE: inventory_nr from int to string
{ "name" : "Plasma ray" , "inventory_nr" : "AR2411" },
]
}]
# dlt automatically handles all changes
pipeline.run(data, table_name = "org" )
Schema Changes Applied
What happened:
Column added : New ceo column created in org table
Column renamed : New address__main_block column created, old address__building stops receiving data
Column removed : address__room stops receiving data but remains in destination
Type changed : New variant column inventory_nr__v_text created for string type
Nested Data Normalization
dlt flattens nested structures into relational tables:
import dlt
data = [{
"id" : 1 ,
"user" : "alice" ,
"profile" : {
"age" : 30 ,
"city" : "NYC" ,
"preferences" : {
"theme" : "dark" ,
"notifications" : True ,
},
},
"orders" : [
{ "order_id" : 101 , "amount" : 50.0 },
{ "order_id" : 102 , "amount" : 75.5 },
]
}]
pipeline = dlt.pipeline(
pipeline_name = "users" ,
destination = "duckdb" ,
dataset_name = "user_data"
)
pipeline.run(data, table_name = "users" )
# Creates tables:
# - users: id, user, profile__age, profile__city, profile__preferences__theme, profile__preferences__notifications
# - users__orders: order_id, amount, _dlt_parent_id, _dlt_list_idx
Column Type Changes
Variant Columns
When a column’s type changes, dlt creates variant columns:
# First load
data_v1 = [
{ "id" : 1 , "score" : 42 }, # score is integer
{ "id" : 2 , "score" : 100 },
]
pipeline.run(data_v1, table_name = "results" )
# Creates: results table with score (bigint)
# Second load - type changed
data_v2 = [
{ "id" : 3 , "score" : "high" }, # score is now string
{ "id" : 4 , "score" : "medium" },
]
pipeline.run(data_v2, table_name = "results" )
# Creates: variant column score__v_text (text)
# Original score column remains for integers
Variant columns are named using the pattern: {column_name}__v_{type}
Supported Type Coercions
dlt automatically coerces compatible types:
# These type changes are handled automatically:
# Integer -> Float (coerced)
{ "value" : 42 } # int
{ "value" : 42.5 } # float - coerced automatically
# String -> Text (coerced)
{ "text" : "short" } # varchar(256)
{ "text" : "very long..." } # text - upgraded automatically
# Incompatible types create variants:
{ "field" : 123 } # bigint
{ "field" : "text" } # Creates field__v_text
Controlling Schema Changes
Use hints to control how schemas evolve:
import dlt
from dlt.common.schema.typing import TColumnSchema
@dlt.resource
def users ():
yield [
{ "id" : 1 , "name" : "Alice" , "email" : "[email protected] " },
]
# Apply hints to control schema
users.apply_hints(
columns = {
"id" : { "data_type" : "bigint" , "nullable" : False },
"email" : { "data_type" : "text" , "unique" : True },
"name" : { "data_type" : "text" },
}
)
pipeline = dlt.pipeline(
pipeline_name = "users" ,
destination = "duckdb" ,
dataset_name = "user_data"
)
pipeline.run(users())
Schema Versioning
dlt tracks schema versions automatically:
import dlt
pipeline = dlt.pipeline(
pipeline_name = "versioned" ,
destination = "duckdb" ,
dataset_name = "data"
)
# Access schema
schema = pipeline.default_schema
# View schema version
print ( f "Schema version: { schema.version } " )
print ( f "Schema hash: { schema.version_hash } " )
# Export schema to file
schema.to_pretty_yaml( "schema.yaml" )
schema.to_pretty_json( "schema.json" )
Tracking Schema Changes
Monitor schema evolution with lineage tracking:
import dlt
pipeline = dlt.pipeline(
pipeline_name = "tracked" ,
destination = "duckdb" ,
dataset_name = "data"
)
data = [{ "id" : 1 , "name" : "Alice" }]
pipeline.run(data, table_name = "users" )
# View schema changes in load info
load_info = pipeline.run(data, table_name = "users" )
for package in load_info.load_packages:
if package.schema_update:
print ( "Schema was updated!" )
for table_name, columns in package.schema_update.items():
print ( f "Table { table_name } changes:" )
for column in columns:
print ( f " - { column } " )
Alerting on Schema Changes
Get notified when schema changes occur:
import dlt
def check_schema_changes ( load_info ):
"""Alert on schema changes"""
for package in load_info.load_packages:
if package.schema_update:
# Send alert (email, Slack, etc.)
print ( f "⚠️ Schema changed in { package.schema_name } " )
for table_name, columns in package.schema_update.items():
print ( f "Table: { table_name } " )
for col in columns:
print ( f " New column: { col[ 'name' ] } ( { col[ 'data_type' ] } )" )
pipeline = dlt.pipeline(
pipeline_name = "monitored" ,
destination = "duckdb" ,
dataset_name = "data"
)
load_info = pipeline.run(data, table_name = "users" )
check_schema_changes(load_info)
Custom Normalizers
Customize how data is normalized:
import dlt
from dlt.common.schema.typing import TColumnSchema
# Custom naming convention
def custom_naming ( path : str ) -> str :
"""Use custom column naming"""
# Convert nested paths to custom format
# e.g., "user.profile.name" -> "user_profile_name"
return path.replace( "." , "_" )
pipeline = dlt.pipeline(
pipeline_name = "custom" ,
destination = "duckdb" ,
dataset_name = "data"
)
# Apply custom normalizer (implement via schema configuration)
Handling Removed Columns
Columns are never deleted from destination:
# First load
data_v1 = [{ "id" : 1 , "name" : "Alice" , "age" : 30 }]
pipeline.run(data_v1, table_name = "users" )
# Creates: id, name, age columns
# Second load - 'age' removed from source
data_v2 = [{ "id" : 2 , "name" : "Bob" }]
pipeline.run(data_v2, table_name = "users" )
# 'age' column remains in destination with NULL for new rows
Removed columns remain in the destination with NULL values for new records. They are never dropped automatically.
Schema Export and Import
Export schemas for documentation or version control:
import dlt
pipeline = dlt.pipeline(
pipeline_name = "export_schema" ,
destination = "duckdb" ,
dataset_name = "data"
)
# Load data
pipeline.run(data, table_name = "users" )
# Export schema
schema = pipeline.default_schema
schema.to_pretty_yaml( "schemas/users_schema.yaml" )
schema.to_pretty_json( "schemas/users_schema.json" )
# Import schema
from dlt.common.schema import Schema
imported_schema = Schema.from_dict(
dlt.common.json.load_json( "schemas/users_schema.json" )
)
Best Practices
Design data models expecting changes
Use semantic versioning for major changes
Document expected schema evolution
Test schema changes in development first
Set up alerts for schema modifications
Review schema changes regularly
Track variant column creation
Document type changes for consumers
Use schema contracts to restrict changes
Apply hints to enforce data types
Validate critical columns
Test with sample data first
Notify downstream consumers of changes
Maintain schema documentation
Version control schema files
Use column lineage for tracking
Next Steps
Data Contracts Control and validate schema changes
Incremental Loading Combine with incremental loading