Skip to main content
A schema describes the structure of your data - table names, column types, constraints, and relationships. In dlt, schemas are automatically inferred from your data but can be customized and controlled to match your requirements.

What is a Schema?

A schema in dlt:
  • Defines table structures including column names and data types
  • Manages relationships between parent and child tables
  • Controls schema evolution as your data changes over time
  • Handles normalization of nested data into relational tables
  • Enforces contracts about what changes are allowed
Think of a schema as the blueprint that describes how your data will be organized in the destination.

Schema Inference

dlt automatically infers schemas from your data:
import dlt

@dlt.resource
def users():
    yield {
        "id": 1,
        "email": "[email protected]",
        "created_at": "2024-01-01T00:00:00Z",
        "is_active": True,
        "metadata": {"source": "api", "version": 2}
    }

pipeline = dlt.pipeline(
    pipeline_name="user_pipeline",
    destination="duckdb",
    dataset_name="users_data"
)

# Schema automatically inferred from data structure
load_info = pipeline.run(users())
This creates a table with columns:
  • id (bigint)
  • email (text)
  • created_at (timestamp)
  • is_active (bool)
  • metadata (json or struct)

Explicit Schema Definition

Define schemas explicitly for more control:
import dlt
from dlt.common.schema import Schema

# Create explicit schema
my_schema = Schema("users_schema")

@dlt.resource(
    columns=[
        {"name": "id", "data_type": "bigint", "nullable": False},
        {"name": "email", "data_type": "text", "nullable": False, "unique": True},
        {"name": "created_at", "data_type": "timestamp", "nullable": False},
        {"name": "last_login", "data_type": "timestamp", "nullable": True},
        {"name": "profile", "data_type": "json"},
    ],
    primary_key="id"
)
def users():
    yield {"id": 1, "email": "[email protected]", "created_at": pendulum.now()}

pipeline = dlt.pipeline(
    pipeline_name="user_pipeline",
    destination="postgres",
    dataset_name="users_data"
)

load_info = pipeline.run(users(), schema=my_schema)

Schema Evolution

Schemas evolve as your data changes. Control evolution with schema contracts:

Evolve Mode (Default)

Automatically add new columns and tables:
import dlt

@dlt.resource(schema_contract="evolve")  # Default behavior
def api_data():
    # First run yields:
    yield {"id": 1, "name": "Alice"}
    
    # Later runs can add new fields:
    yield {"id": 2, "name": "Bob", "email": "[email protected]"}  # email column added

Freeze Mode

Reject any schema changes:
import dlt

@dlt.resource(
    schema_contract={
        "tables": "freeze",      # No new tables
        "columns": "freeze",     # No new columns
        "data_type": "freeze"    # No data type changes
    }
)
def strict_api_data():
    yield {"id": 1, "name": "Alice"}
    # This would fail if a new field appeared:
    # yield {"id": 2, "name": "Bob", "email": "[email protected]"}  # Error!

Discard Mode

Ignore new columns silently:
import dlt

@dlt.resource(
    schema_contract={
        "columns": "discard_value",  # Ignore new columns
        "tables": "evolve"           # Allow new tables
    }
)
def api_with_extras():
    yield {"id": 1, "name": "Alice", "extra_field": "ignored"}  # extra_field discarded

Granular Control

import dlt

@dlt.source(
    schema_contract={
        "tables": "evolve",       # Allow new tables
        "columns": "freeze",      # No new columns in existing tables  
        "data_type": "discard_row" # Drop rows with incompatible types
    }
)
def api_source():
    return [resource_a(), resource_b()]

Data Type Mapping

dlt maps Python types to destination types:
Python Typedlt TypeSQL Type (Postgres)
intbigintBIGINT
floatdoubleDOUBLE PRECISION
strtextTEXT
boolboolBOOLEAN
datetimetimestampTIMESTAMP WITH TIME ZONE
datedateDATE
timetimeTIME
DecimaldecimalNUMERIC
bytesbinaryBYTEA
dictjsonJSONB
listjsonJSONB

Nested Data

dlt normalizes nested structures into separate tables:
import dlt

@dlt.resource
def orders():
    yield {
        "order_id": 1,
        "customer": "Alice",
        "items": [
            {"product": "Laptop", "price": 999},
            {"product": "Mouse", "price": 25}
        ]
    }

pipeline = dlt.pipeline(
    pipeline_name="orders_pipeline",
    destination="duckdb",
    dataset_name="sales"
)

load_info = pipeline.run(orders())
Creates two tables: orders table:
order_idcustomer
1Alice
orders__items table:
productprice_dlt_parent_id
Laptop999hash_value
Mouse25hash_value

Control Nesting Depth

import dlt

@dlt.resource(max_table_nesting=1)  # Flatten after 1 level
def deeply_nested():
    yield {
        "id": 1,
        "data": {
            "level1": {  # Separate table
                "level2": {  # Stored as JSON
                    "level3": "value"
                }
            }
        }
    }

Schema Access and Modification

Access Pipeline Schema

import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb"
)

# Get default schema
schema = pipeline.default_schema

# List all tables
for table_name in schema.tables:
    print(f"Table: {table_name}")
    table = schema.get_table(table_name)
    print(f"  Columns: {list(table['columns'].keys())}")

Modify Schema

import dlt
from dlt.common.schema import Schema

schema = Schema("my_schema")

# Define a table
schema.update_table({
    "name": "users",
    "columns": {
        "id": {"name": "id", "data_type": "bigint", "nullable": False},
        "email": {"name": "email", "data_type": "text", "unique": True}
    }
})

# Use in pipeline
pipeline.run(my_data(), schema=schema)

Import/Export Schema

import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb",
    import_schema_path="schemas/my_schema.yaml",  # Load schema from file
    export_schema_path="schemas/my_schema.yaml"   # Save schema to file
)

load_info = pipeline.run(my_source())

Schema YAML Format

Schemas can be defined in YAML:
version: 1
version_hash: abc123
engine_version: 10

tables:
  users:
    name: users
    write_disposition: append
    columns:
      id:
        name: id
        data_type: bigint
        nullable: false
      email:
        name: email
        data_type: text
        nullable: false
        unique: true
      created_at:
        name: created_at
        data_type: timestamp
        nullable: false
    
  orders:
    name: orders
    write_disposition: append
    parent: users
    columns:
      order_id:
        name: order_id
        data_type: bigint
        nullable: false
      user_id:
        name: user_id
        data_type: bigint
        nullable: false
      amount:
        name: amount
        data_type: decimal
        precision: 10
        scale: 2

Naming Conventions

Schemas use naming conventions to normalize identifiers:
import dlt
from dlt.common.schema import Schema
from dlt.common.normalizers.naming import snake_case, direct

# Use snake_case (default)
schema = Schema(
    "my_schema",
    normalizers={"names": "snake_case"}  # camelCase -> snake_case
)

# Use direct (no transformation)
schema = Schema(
    "my_schema",
    normalizers={"names": "direct"}  # Keep original names
)

Type Signature

From /home/daytona/workspace/source/dlt/common/schema/schema.py:77-118:
class Schema:
    ENGINE_VERSION: ClassVar[int] = SCHEMA_ENGINE_VERSION
    
    naming: NamingConvention
    """Naming convention used by the schema to normalize identifiers"""
    
    data_item_normalizer: DataItemNormalizer[Any]
    """Data item normalizer used by the schema to create tables"""
    
    def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None:
        """Create a new schema with given name and normalizers"""
        ...
    
    @classmethod
    def from_dict(
        cls,
        d: DictStrAny,
        remove_processing_hints: bool = False,
        bump_version: bool = True,
        validate_schema: bool = True,
    ) -> "Schema":
        """Load schema from dictionary"""
        ...

Best Practices

1

Start with inference

Let dlt infer schemas automatically, then refine as needed for production
2

Use schema contracts in production

Set schema contracts to “freeze” or “discard_value” to prevent unexpected changes
3

Version your schemas

Export schemas to YAML and track them in version control
4

Define primary keys

Always specify primary keys for proper relationships and deduplication
5

Control nesting

Use max_table_nesting to prevent excessive table creation from deeply nested data
Schema Versioning: dlt automatically versions schemas and tracks changes. Each schema has a hash that changes when the structure changes.
Breaking Changes: Using “freeze” mode will cause pipeline failures if new fields appear. Use “discard_value” if you want to ignore unexpected fields gracefully.

Common Patterns

Strict Production Schema

import dlt

@dlt.source(
    schema_contract={
        "tables": "freeze",
        "columns": "freeze",
        "data_type": "freeze"
    }
)
def production_source():
    """No schema changes allowed in production"""
    return [users(), orders()]

Flexible Development Schema

import dlt

@dlt.source(
    schema_contract={
        "tables": "evolve",
        "columns": "evolve",
        "data_type": "evolve"
    }
)
def dev_source():
    """Allow all changes during development"""
    return [users(), orders()]

Partial Evolution

import dlt

@dlt.source(
    schema_contract={
        "tables": "evolve",          # Allow new tables
        "columns": "discard_value",  # Ignore new columns
        "data_type": "freeze"        # No type changes
    }
)
def semi_strict_source():
    """Balance between flexibility and control"""
    return [users(), orders()]
  • Resource - Defines individual table schemas
  • Source - Groups tables under a single schema
  • Pipeline - Manages schema storage and evolution
  • Destination - Converts schemas to destination-specific formats

Build docs developers (and LLMs) love