A schema describes the structure of your data - table names, column types, constraints, and relationships. In dlt, schemas are automatically inferred from your data but can be customized and controlled to match your requirements.
What is a Schema?
A schema in dlt:
- Defines table structures including column names and data types
- Manages relationships between parent and child tables
- Controls schema evolution as your data changes over time
- Handles normalization of nested data into relational tables
- Enforces contracts about what changes are allowed
Think of a schema as the blueprint that describes how your data will be organized in the destination.
Schema Inference
dlt automatically infers schemas from your data:
import dlt
@dlt.resource
def users():
yield {
"id": 1,
"email": "[email protected]",
"created_at": "2024-01-01T00:00:00Z",
"is_active": True,
"metadata": {"source": "api", "version": 2}
}
pipeline = dlt.pipeline(
pipeline_name="user_pipeline",
destination="duckdb",
dataset_name="users_data"
)
# Schema automatically inferred from data structure
load_info = pipeline.run(users())
This creates a table with columns:
id (bigint)
email (text)
created_at (timestamp)
is_active (bool)
metadata (json or struct)
Explicit Schema Definition
Define schemas explicitly for more control:
import dlt
from dlt.common.schema import Schema
# Create explicit schema
my_schema = Schema("users_schema")
@dlt.resource(
columns=[
{"name": "id", "data_type": "bigint", "nullable": False},
{"name": "email", "data_type": "text", "nullable": False, "unique": True},
{"name": "created_at", "data_type": "timestamp", "nullable": False},
{"name": "last_login", "data_type": "timestamp", "nullable": True},
{"name": "profile", "data_type": "json"},
],
primary_key="id"
)
def users():
yield {"id": 1, "email": "[email protected]", "created_at": pendulum.now()}
pipeline = dlt.pipeline(
pipeline_name="user_pipeline",
destination="postgres",
dataset_name="users_data"
)
load_info = pipeline.run(users(), schema=my_schema)
Schema Evolution
Schemas evolve as your data changes. Control evolution with schema contracts:
Evolve Mode (Default)
Automatically add new columns and tables:
import dlt
@dlt.resource(schema_contract="evolve") # Default behavior
def api_data():
# First run yields:
yield {"id": 1, "name": "Alice"}
# Later runs can add new fields:
yield {"id": 2, "name": "Bob", "email": "[email protected]"} # email column added
Freeze Mode
Reject any schema changes:
import dlt
@dlt.resource(
schema_contract={
"tables": "freeze", # No new tables
"columns": "freeze", # No new columns
"data_type": "freeze" # No data type changes
}
)
def strict_api_data():
yield {"id": 1, "name": "Alice"}
# This would fail if a new field appeared:
# yield {"id": 2, "name": "Bob", "email": "[email protected]"} # Error!
Discard Mode
Ignore new columns silently:
import dlt
@dlt.resource(
schema_contract={
"columns": "discard_value", # Ignore new columns
"tables": "evolve" # Allow new tables
}
)
def api_with_extras():
yield {"id": 1, "name": "Alice", "extra_field": "ignored"} # extra_field discarded
Granular Control
import dlt
@dlt.source(
schema_contract={
"tables": "evolve", # Allow new tables
"columns": "freeze", # No new columns in existing tables
"data_type": "discard_row" # Drop rows with incompatible types
}
)
def api_source():
return [resource_a(), resource_b()]
Data Type Mapping
dlt maps Python types to destination types:
| Python Type | dlt Type | SQL Type (Postgres) |
|---|
int | bigint | BIGINT |
float | double | DOUBLE PRECISION |
str | text | TEXT |
bool | bool | BOOLEAN |
datetime | timestamp | TIMESTAMP WITH TIME ZONE |
date | date | DATE |
time | time | TIME |
Decimal | decimal | NUMERIC |
bytes | binary | BYTEA |
dict | json | JSONB |
list | json | JSONB |
Nested Data
dlt normalizes nested structures into separate tables:
import dlt
@dlt.resource
def orders():
yield {
"order_id": 1,
"customer": "Alice",
"items": [
{"product": "Laptop", "price": 999},
{"product": "Mouse", "price": 25}
]
}
pipeline = dlt.pipeline(
pipeline_name="orders_pipeline",
destination="duckdb",
dataset_name="sales"
)
load_info = pipeline.run(orders())
Creates two tables:
orders table:
orders__items table:
| product | price | _dlt_parent_id |
|---|
| Laptop | 999 | hash_value |
| Mouse | 25 | hash_value |
Control Nesting Depth
import dlt
@dlt.resource(max_table_nesting=1) # Flatten after 1 level
def deeply_nested():
yield {
"id": 1,
"data": {
"level1": { # Separate table
"level2": { # Stored as JSON
"level3": "value"
}
}
}
}
Schema Access and Modification
Access Pipeline Schema
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb"
)
# Get default schema
schema = pipeline.default_schema
# List all tables
for table_name in schema.tables:
print(f"Table: {table_name}")
table = schema.get_table(table_name)
print(f" Columns: {list(table['columns'].keys())}")
Modify Schema
import dlt
from dlt.common.schema import Schema
schema = Schema("my_schema")
# Define a table
schema.update_table({
"name": "users",
"columns": {
"id": {"name": "id", "data_type": "bigint", "nullable": False},
"email": {"name": "email", "data_type": "text", "unique": True}
}
})
# Use in pipeline
pipeline.run(my_data(), schema=schema)
Import/Export Schema
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
import_schema_path="schemas/my_schema.yaml", # Load schema from file
export_schema_path="schemas/my_schema.yaml" # Save schema to file
)
load_info = pipeline.run(my_source())
Schemas can be defined in YAML:
version: 1
version_hash: abc123
engine_version: 10
tables:
users:
name: users
write_disposition: append
columns:
id:
name: id
data_type: bigint
nullable: false
email:
name: email
data_type: text
nullable: false
unique: true
created_at:
name: created_at
data_type: timestamp
nullable: false
orders:
name: orders
write_disposition: append
parent: users
columns:
order_id:
name: order_id
data_type: bigint
nullable: false
user_id:
name: user_id
data_type: bigint
nullable: false
amount:
name: amount
data_type: decimal
precision: 10
scale: 2
Naming Conventions
Schemas use naming conventions to normalize identifiers:
import dlt
from dlt.common.schema import Schema
from dlt.common.normalizers.naming import snake_case, direct
# Use snake_case (default)
schema = Schema(
"my_schema",
normalizers={"names": "snake_case"} # camelCase -> snake_case
)
# Use direct (no transformation)
schema = Schema(
"my_schema",
normalizers={"names": "direct"} # Keep original names
)
Type Signature
From /home/daytona/workspace/source/dlt/common/schema/schema.py:77-118:
class Schema:
ENGINE_VERSION: ClassVar[int] = SCHEMA_ENGINE_VERSION
naming: NamingConvention
"""Naming convention used by the schema to normalize identifiers"""
data_item_normalizer: DataItemNormalizer[Any]
"""Data item normalizer used by the schema to create tables"""
def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None:
"""Create a new schema with given name and normalizers"""
...
@classmethod
def from_dict(
cls,
d: DictStrAny,
remove_processing_hints: bool = False,
bump_version: bool = True,
validate_schema: bool = True,
) -> "Schema":
"""Load schema from dictionary"""
...
Best Practices
Start with inference
Let dlt infer schemas automatically, then refine as needed for production
Use schema contracts in production
Set schema contracts to “freeze” or “discard_value” to prevent unexpected changes
Version your schemas
Export schemas to YAML and track them in version control
Define primary keys
Always specify primary keys for proper relationships and deduplication
Control nesting
Use max_table_nesting to prevent excessive table creation from deeply nested data
Schema Versioning: dlt automatically versions schemas and tracks changes. Each schema has a hash that changes when the structure changes.
Breaking Changes: Using “freeze” mode will cause pipeline failures if new fields appear. Use “discard_value” if you want to ignore unexpected fields gracefully.
Common Patterns
Strict Production Schema
import dlt
@dlt.source(
schema_contract={
"tables": "freeze",
"columns": "freeze",
"data_type": "freeze"
}
)
def production_source():
"""No schema changes allowed in production"""
return [users(), orders()]
Flexible Development Schema
import dlt
@dlt.source(
schema_contract={
"tables": "evolve",
"columns": "evolve",
"data_type": "evolve"
}
)
def dev_source():
"""Allow all changes during development"""
return [users(), orders()]
Partial Evolution
import dlt
@dlt.source(
schema_contract={
"tables": "evolve", # Allow new tables
"columns": "discard_value", # Ignore new columns
"data_type": "freeze" # No type changes
}
)
def semi_strict_source():
"""Balance between flexibility and control"""
return [users(), orders()]
- Resource - Defines individual table schemas
- Source - Groups tables under a single schema
- Pipeline - Manages schema storage and evolution
- Destination - Converts schemas to destination-specific formats