Skip to main content
Incremental loading enables you to load only new or changed data instead of reloading everything. This reduces costs, improves performance, and enables real-time data pipelines.

Write Dispositions

dlt supports three write dispositions that control how data is loaded:
1

Replace (Full Load)

Replaces all data in the destination with new data from the source.
pipeline.run(source(), write_disposition="replace")
Use for small datasets or when you need a complete refresh.
2

Append (Insert Only)

Adds new data to the destination without modifying existing records.
pipeline.run(source(), write_disposition="append")
Use for immutable event data like logs, clicks, or transactions.
3

Merge (Upsert)

Updates existing records and inserts new ones based on primary/merge keys.
@dlt.resource(primary_key="id", write_disposition="merge")
def users():
    yield [{"id": 1, "name": "Alice", "status": "active"}]
Use for stateful data that changes over time like user profiles or product catalogs.

Cursor-Based Incremental Loading

Track changes using a cursor field (timestamp, ID, version number):
import dlt
from dlt.sources.helpers import requests

@dlt.resource(primary_key="id")
def repo_issues(
    repository: str,
    updated_at=dlt.sources.incremental(
        "updated_at",
        initial_value="1970-01-01T00:00:00Z"
    )
):
    """Load GitHub issues incrementally"""
    url = f"https://api.github.com/repos/{repository}/issues"
    
    # Use start_value to get only new/updated issues
    response = requests.get(url, params={
        "since": updated_at.start_value,
        "state": "all",
        "per_page": 100,
    })
    response.raise_for_status()
    
    yield response.json()
    
    # last_value is automatically tracked
    print(f"Latest update: {updated_at.last_value}")

pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
    dataset_name="github_data",
)

# First run: loads all issues since 1970
load_info = pipeline.run(repo_issues("dlt-hub/dlt"))

# Second run: loads only new/updated issues
load_info = pipeline.run(repo_issues("dlt-hub/dlt"))

How It Works

# The incremental instance provides:

# initial_value: Starting point (never changes)
updated_at.initial_value  # "1970-01-01T00:00:00Z"

# start_value: Max value from previous run or initial_value
updated_at.start_value  # "2024-01-01T10:30:00Z"

# last_value: Real-time max value (updates with each item)
updated_at.last_value  # "2024-01-15T14:22:00Z"

# end_value: Optional end point for backfills
updated_at.end_value  # "2024-02-01T00:00:00Z"

Automatic Deduplication

When an API doesn’t support filtering, dlt handles it:
@dlt.resource(
    primary_key="id",
    table_name=lambda i: i["type"]  # Dynamic table names
)
def repo_events(
    repository: str,
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="1970-01-01T00:00:00Z",
        row_order="desc"  # Optimize: stop when out of range
    )
):
    """GitHub events API returns all events - dlt filters them"""
    url = f"https://api.github.com/repos/{repository}/events"
    
    # API doesn't support 'since' parameter
    response = requests.get(url, params={"per_page": 100})
    response.raise_for_status()
    
    # Yield all data - dlt automatically:
    # 1. Filters out old events using created_at
    # 2. Deduplicates using primary_key (id)
    # 3. Stops pagination when row_order="desc" and out of range
    yield response.json()

Custom Last Value Function

Use custom logic to determine the “last” value:
# Track max value per table type
def by_event_type(event):
    if len(event) == 1:
        item, = event
        last_value = {}
    else:
        item, last_value = event
        last_value = dict(last_value)
    
    item_type = item["type"]
    last_value[item_type] = max(
        item["created_at"],
        last_value.get(item_type, "1970-01-01T00:00:00Z")
    )
    return last_value

@dlt.resource(
    primary_key="id",
    table_name=lambda i: i["type"]
)
def events(
    created_at=dlt.sources.incremental(
        "$",  # Full item as cursor
        last_value_func=by_event_type
    )
):
    # Load data...
    pass

Backfill with Date Ranges

Load historical data in ranges without affecting incremental state:
@dlt.resource(primary_key="id")
def repo_issues(
    repository: str,
    updated_at=dlt.sources.incremental(
        "updated_at",
        initial_value="1970-01-01T00:00:00Z"
    )
):
    url = f"https://api.github.com/repos/{repository}/issues"
    
    response = requests.get(url, params={
        "since": updated_at.start_value,
        "until": updated_at.end_value,  # Optional end date
        "state": "all",
    })
    response.raise_for_status()
    yield response.json()

# Normal incremental load
pipeline.run(repo_issues("dlt-hub/dlt"))

# Backfill specific ranges (stateless)
july_issues = repo_issues(
    "dlt-hub/dlt",
    updated_at=dlt.sources.incremental(
        initial_value="2024-07-01T00:00:00Z",
        end_value="2024-08-01T00:00:00Z"
    )
)

august_issues = repo_issues(
    "dlt-hub/dlt",
    updated_at=dlt.sources.incremental(
        initial_value="2024-08-01T00:00:00Z",
        end_value="2024-09-01T00:00:00Z"
    )
)

# Run backfills in parallel (doesn't affect state)
pipeline.run([july_issues, august_issues])
Backfills with end_value are stateless and can run in parallel without affecting the main incremental state.

Lag/Attribution Window

Handle late-arriving data by reprocessing recent records:
@dlt.resource(primary_key="id")
def events(
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="2024-01-01T00:00:00Z",
        lag=3600,  # Reprocess last hour (in seconds)
    )
):
    """Reprocess last hour of data to catch late arrivals"""
    # If last_value was 2024-01-15 14:00:00
    # start_value will be 2024-01-15 13:00:00 (1 hour earlier)
    yield fetch_events(since=created_at.start_value)

Row Order Optimization

Stop fetching data early when rows are ordered:
@dlt.resource(primary_key="id")
def events(
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="2024-01-01T00:00:00Z",
        row_order="desc",  # Newest first
    )
):
    """API returns newest events first"""
    # dlt stops iterating when created_at < start_value
    # Saves API calls and processing time
    for page in paginate_events():
        yield page

Full Refresh

Force a complete reload when needed:
pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
    dataset_name="github_data",
)

# Normal incremental load
pipeline.run(repo_issues())

# Force full refresh - deletes and reloads
pipeline.run(repo_issues(), write_disposition="replace")

# Refresh just one table
pipeline.run(
    repo_issues().with_resources("issues"),
    write_disposition="replace"
)

SQL Database Incremental Loading

import dlt
from dlt.sources.sql_database import sql_database

pipeline = dlt.pipeline(
    pipeline_name="database_sync",
    destination="duckdb",
    dataset_name="db_data",
)

# Load with incremental
source = sql_database(
    "mysql+pymysql://[email protected]:4497/Rfam"
).with_resources("family", "clan")

# Configure incremental on timestamp columns
source.family.apply_hints(
    incremental=dlt.sources.incremental("updated")
)
source.clan.apply_hints(
    incremental=dlt.sources.incremental("updated")
)

# Merge changes into destination
load_info = pipeline.run(source, write_disposition="merge")
print(load_info)

REST API Incremental Loading

from dlt.sources.rest_api import rest_api_source
from dlt.common.pendulum import pendulum

config = {
    "client": {
        "base_url": "https://api.github.com/repos/dlt-hub/dlt/",
    },
    "resources": [
        {
            "name": "issues",
            "endpoint": {
                "path": "issues",
                "params": {
                    "since": "{incremental.start_value}",
                    "state": "all",
                },
                "incremental": {
                    "cursor_path": "updated_at",
                    "initial_value": pendulum.today().subtract(days=30).to_iso8601_string(),
                },
            },
        },
    ],
}

source = rest_api_source(config)
pipeline.run(source)

Best Practices

  • Use updated_at for data that changes (users, orders)
  • Use created_at for append-only data (events, logs)
  • Ensure the cursor field is indexed in the source
  • Verify the cursor field is always populated
  • Always use UTC timestamps
  • Ensure source and destination use the same timezone
  • Use timezone-aware datetime objects
  • Don’t use far past dates unless necessary (e.g., “1970-01-01”)
  • Start from a reasonable date to limit initial load
  • Consider using end_value for bounded initial loads
# Check incremental state
pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
)

# View current state
print(pipeline.state)

# Reset state if needed
pipeline.drop()

Next Steps

Schema Evolution

Handle changing schemas automatically

Data Contracts

Validate and enforce data quality

Build docs developers (and LLMs) love