Skip to main content
Data contracts give you control over schema evolution by defining rules for how dlt handles schema changes. You can enforce strict validation, allow flexibility, or find a balance between the two.

Quick Start

Control schema evolution at the resource level:
import dlt

@dlt.resource(
    schema_contract={
        "tables": "evolve",    # Allow new tables
        "columns": "freeze",   # Reject new columns
        "data_type": "freeze"  # Reject type changes
    }
)
def items():
    yield [{"id": 1, "name": "Alice"}]

pipeline = dlt.pipeline(
    pipeline_name="strict",
    destination="duckdb",
    dataset_name="data"
)

# This will fail if new columns appear
pipeline.run(items())

Schema Entities

Control three types of schema changes:
Controls when new tables are created (including nested tables and dynamic table names):
@dlt.resource(schema_contract={"tables": "freeze"})
def users():
    yield [
        {
            "id": 1,
            "name": "Alice",
            # This nested list would create a new table -> ERROR
            "orders": [{"order_id": 1}]  
        }
    ]

Contract Modes

evolve

Default mode - No constraints on schema changes
  • New tables are created
  • New columns are added
  • Type changes create variant columns
schema_contract="evolve"  # or
schema_contract={
    "tables": "evolve",
    "columns": "evolve",
    "data_type": "evolve"
}

freeze

Strict mode - Reject any schema changes
  • Raises exception on schema changes
  • No data is loaded
  • Best for production stability
schema_contract="freeze"  # or
schema_contract={
    "tables": "freeze",
    "columns": "freeze",
    "data_type": "freeze"
}

discard_row

Filter mode - Skip rows that don’t match schema
  • Discards non-conforming rows
  • Continues loading valid data
  • Useful for data quality
schema_contract="discard_row"

discard_value

Clean mode - Remove non-conforming values
  • Strips invalid fields from rows
  • Loads rows with valid fields
  • Maintains data flow
schema_contract="discard_value"

Setting Contracts

Resource Level

@dlt.resource(
    schema_contract={
        "tables": "evolve",
        "columns": "freeze",
        "data_type": "freeze"
    }
)
def strict_users():
    """New tables OK, but columns and types are frozen"""
    yield [{"id": 1, "name": "Alice"}]

Source Level

Apply to all resources in a source:
@dlt.source(schema_contract="freeze")
def api_source():
    """All resources inherit freeze contract"""
    
    @dlt.resource
    def users():
        yield [{"id": 1, "name": "Alice"}]
    
    @dlt.resource
    def orders():
        yield [{"order_id": 1, "amount": 50.0}]
    
    return users, orders

Pipeline Run Level

Override all existing contracts:
pipeline = dlt.pipeline(
    pipeline_name="api",
    destination="duckdb",
    dataset_name="data"
)

# Override any resource/source contracts
pipeline.run(
    api_source(),
    schema_contract="freeze"  # Strictest setting
)

Dynamic Contract Updates

Change contracts at runtime:
@dlt.resource
def users():
    yield [{"id": 1, "name": "Alice"}]

# Update contract on resource instance
users.apply_hints(
    schema_contract={
        "columns": "freeze",
        "data_type": "discard_row"
    }
)

# Update on source instance
source = api_source()
source.schema_contract = {"tables": "freeze"}

Pydantic Validation

Use Pydantic models for type-safe validation:
import dlt
from pydantic import BaseModel, EmailStr, Field
from typing import List, Optional

class Address(BaseModel):
    street: str
    city: str
    zipcode: str

class User(BaseModel):
    id: int
    name: str = Field(min_length=1, max_length=100)
    email: EmailStr
    age: Optional[int] = Field(None, ge=0, le=150)
    address: Address
    tags: List[str] = []

@dlt.resource(columns=User)
def users():
    """Validates against User model automatically"""
    yield [
        {
            "id": 1,
            "name": "Alice Smith",
            "email": "[email protected]",
            "age": 30,
            "address": {
                "street": "123 Main St",
                "city": "NYC",
                "zipcode": "10001"
            },
            "tags": ["premium", "verified"]
        }
    ]

pipeline = dlt.pipeline(
    pipeline_name="validated",
    destination="duckdb",
    dataset_name="user_data"
)

# Automatically validates all data against User model
pipeline.run(users())

Pydantic Contract Mapping

Pydantic models automatically set contracts:
# Using columns=Model sets:
schema_contract = {
    "tables": "evolve",        # New tables allowed
    "columns": "discard_value", # Extra fields ignored
    "data_type": "freeze"       # Invalid types raise error
}
Customize Pydantic behavior:
from pydantic import ConfigDict

class StrictUser(BaseModel):
    model_config = ConfigDict(
        extra="forbid"  # Maps to columns="freeze"
    )
    
    id: int
    name: str

class LenientUser(BaseModel):
    model_config = ConfigDict(
        extra="allow"  # Maps to columns="evolve"
    )
    
    id: int
    name: str

Pydantic Extra Modes

Mapping between contract modes and Pydantic:
Contract ModePydantic Extra
evolveallow
freezeforbid
discard_valueignore
discard_rowforbid*
*With ValidationError handling

Real-World Examples

Production API (Strict)

@dlt.resource(
    primary_key="id",
    schema_contract="freeze"
)
def production_users():
    """Strict validation - any schema change fails"""
    yield from fetch_users()

# Fails fast on unexpected changes
pipeline.run(production_users())

Development API (Flexible)

@dlt.resource(
    primary_key="id",
    schema_contract="evolve"
)
def dev_users():
    """Flexible - adapts to any changes"""
    yield from fetch_users()

# Adapts to schema changes automatically
pipeline.run(dev_users())

Data Quality (Filtering)

@dlt.resource(
    primary_key="id",
    schema_contract={
        "tables": "evolve",
        "columns": "discard_value",  # Strip unknown fields
        "data_type": "discard_row"    # Skip invalid rows
    }
)
def quality_checked_users():
    """Loads valid data, discards invalid"""
    yield from fetch_users()

load_info = pipeline.run(quality_checked_users())

# Check what was discarded
print(f"Rows discarded: {load_info.metrics['rows_discarded']}")

Mixed Strategy

@dlt.source
def mixed_source():
    # Critical table - frozen
    @dlt.resource(schema_contract="freeze")
    def transactions():
        yield from fetch_transactions()
    
    # Reference data - flexible
    @dlt.resource(schema_contract="evolve")
    def categories():
        yield from fetch_categories()
    
    # Event data - filter bad rows
    @dlt.resource(schema_contract="discard_row")
    def events():
        yield from fetch_events()
    
    return transactions, categories, events

pipeline.run(mixed_source())

Contract Inheritance

Contracts flow from general to specific:
# 1. Pipeline run (highest priority)
pipeline.run(source(), schema_contract="freeze")

# 2. Source level
@dlt.source(schema_contract="discard_row")
def source():
    # 3. Resource level (lowest priority)
    @dlt.resource(schema_contract="evolve")
    def data():
        yield [...]
    
    return data

# Final contract = "freeze" (from pipeline.run)

Handling Contract Violations

import dlt
from dlt.common.exceptions import PipelineStepFailed

pipeline = dlt.pipeline(
    pipeline_name="validated",
    destination="duckdb",
    dataset_name="data"
)

try:
    pipeline.run(
        strict_resource(),
        schema_contract="freeze"
    )
except PipelineStepFailed as e:
    print(f"Schema contract violation: {e}")
    # Handle error (alert, log, retry with different contract, etc.)

Best Practices

# Development: flexible
schema_contract="evolve"

# Staging: moderate
schema_contract={
    "tables": "freeze",
    "columns": "evolve",
    "data_type": "discard_row"
}

# Production: strict
schema_contract="freeze"
from pydantic import BaseModel, validator

class User(BaseModel):
    id: int
    email: str
    
    @validator('email')
    def validate_email(cls, v):
        if '@' not in v:
            raise ValueError('Invalid email')
        return v

@dlt.resource(columns=User)
def users():
    yield from fetch_users()
load_info = pipeline.run(resource())

# Check for schema changes
for package in load_info.load_packages:
    if package.schema_update:
        send_alert(f"Schema changed: {package.schema_update}")
@dlt.resource(
    schema_contract="freeze"
)
def critical_data():
    """PRODUCTION RESOURCE - DO NOT MODIFY SCHEMA
    
    Contract: freeze
    - No new tables
    - No new columns  
    - No type changes
    
    Contact: [email protected] for changes
    """
    yield from fetch_data()

Next Steps

Schema Evolution

Learn about automatic schema evolution

Resources

Deep dive into resources and hints

Build docs developers (and LLMs) love