Skip to main content
State is a persistent dictionary that dlt maintains across pipeline runs. It enables incremental loading by tracking what data has already been processed, allowing you to pick up where you left off.

What is State?

State in dlt:
  • Persists across runs - Stored with your pipeline and restored on next run
  • Tracks progress - Remembers what data has been processed
  • Enables incremental loading - Load only new or changed data
  • Scoped to sources and resources - Isolated state for different parts of your pipeline
  • Loaded to destination - State is stored in a special table for reliability
Think of state as your pipeline’s memory - it remembers where it left off so you don’t reprocess data.

Why Use State?

Without state, you’d reload all data every time:
import dlt

@dlt.resource(write_disposition="replace")
def all_issues():
    """Loads ALL issues every run - inefficient!"""
    url = "https://api.github.com/repos/dlt-hub/dlt/issues"
    response = requests.get(url)
    yield response.json()
With state, load only what’s new:
import dlt
import pendulum

@dlt.resource(write_disposition="append")
def new_issues():
    """Loads only issues created since last run"""
    # Get resource state
    state = dlt.current.resource_state()
    last_created = state.get("last_created_at")
    
    url = "https://api.github.com/repos/dlt-hub/dlt/issues"
    params = {}
    
    if last_created:
        params["since"] = last_created
        print(f"Loading issues since {last_created}")
    else:
        print("First run - loading all issues")
    
    response = requests.get(url, params=params)
    issues = response.json()
    
    if issues:
        # Track the latest created_at timestamp
        latest = max(issue["created_at"] for issue in issues)
        state["last_created_at"] = latest
    
    yield issues

Resource State

Resource state is scoped to a specific resource:
import dlt

@dlt.resource(write_disposition="append")
def players_games(players: list[str]):
    """Track processed archives per resource"""
    # Each resource has its own state
    state = dlt.current.resource_state()
    checked_archives = state.setdefault("archives", [])
    
    for player in players:
        archives = get_player_archives(player)
        
        for archive_url in archives:
            if archive_url in checked_archives:
                print(f"Skipping {archive_url}")
                continue
            
            print(f"Processing {archive_url}")
            checked_archives.append(archive_url)
            
            response = requests.get(archive_url)
            yield response.json().get("games", [])

Access Resource State

From /home/daytona/workspace/source/dlt/extract/state.py:77-128:
def resource_state(resource_name: str = None) -> DictStrAny:
    """Returns a dictionary with resource-scoped state.
    
    The resource state is visible only to the resource requesting it.
    State is preserved across pipeline runs for incremental loads.
    """
Usage:
import dlt

@dlt.resource
def my_resource():
    # Get state for current resource
    state = dlt.current.resource_state()
    
    # Read from state
    last_id = state.get("last_processed_id", 0)
    
    # Process data
    for item in fetch_items_after(last_id):
        yield item
        last_id = item["id"]
    
    # Write to state
    state["last_processed_id"] = last_id

Source State

Source state is shared across all resources in a source:
import dlt
import pendulum

@dlt.source
def api_source():
    """All resources share source-level state"""
    
    @dlt.resource
    def users():
        # Access shared source state
        state = dlt.current.source_state()
        last_sync = state.get("last_full_sync")
        
        if not last_sync:
            print("First sync - loading all users")
        else:
            print(f"Last sync: {last_sync}")
        
        for user in fetch_users():
            yield user
        
        # Update shared state
        state["last_full_sync"] = pendulum.now().isoformat()
    
    @dlt.resource
    def events():
        # Same source state accessible here
        state = dlt.current.source_state()
        last_sync = state.get("last_full_sync")
        
        print(f"Using last_sync from source state: {last_sync}")
        
        for event in fetch_events_since(last_sync):
            yield event
    
    return [users, events]

Access Source State

From /home/daytona/workspace/source/dlt/extract/state.py:33-64:
def source_state(source_state_key: Optional[str] = None) -> DictStrAny:
    """Returns a dictionary with source-scoped state.
    
    Source state may be shared across resources of a particular source.
    State is preserved across pipeline runs.
    """
Source State is Read-Only: In source-decorated functions, source state is read-only. Only resource functions can write to state.

Incremental Loading with dlt.sources.incremental

The recommended way to do incremental loading:
import dlt
from dlt.sources.helpers import requests

@dlt.resource(
    write_disposition="append",
    primary_key="id"
)
def github_issues(
    updated_since=dlt.sources.incremental("updated_at")
):
    """Load only issues updated since last run"""
    url = "https://api.github.com/repos/dlt-hub/dlt/issues"
    params = {"state": "all", "sort": "updated", "direction": "asc"}
    
    # Use last value from previous run
    if updated_since.last_value:
        params["since"] = updated_since.last_value.isoformat()
        print(f"Loading issues since {params['since']}")
    
    response = requests.get(url, params=params)
    issues = response.json()
    
    for issue in issues:
        yield issue

Incremental with Initial Value

import dlt
import pendulum

@dlt.resource(
    write_disposition="append",
    primary_key="id"
)
def transactions(
    created_after=dlt.sources.incremental(
        "created_at",
        initial_value=pendulum.parse("2024-01-01")
    )
):
    """Load transactions, starting from 2024-01-01 on first run"""
    start_date = created_after.last_value or created_after.initial_value
    
    for transaction in fetch_transactions_after(start_date):
        yield transaction

State Structure

State is stored hierarchically:
{
    "sources": {
        "my_source": {                    # Source-level state
            "last_full_sync": "2024-01-15T10:00:00Z",
            "resources": {                # Resource-level state
                "users": {
                    "last_user_id": 12345,
                    "pages_processed": 42
                },
                "orders": {
                    "last_order_date": "2024-01-14",
                    "archives": ["url1", "url2"]
                }
            }
        }
    }
}

State Persistence

State is stored in two places:
  1. Local filesystem - In pipeline working directory
  2. Destination table - In _dlt_pipeline_state table
import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb"
)

# State loaded from destination on first run
load_info = pipeline.run(my_source())

# State saved to destination after successful run

View State in Destination

import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb"
)

# Query the state table
with pipeline.sql_client() as client:
    result = client.execute_sql(
        "SELECT * FROM _dlt_pipeline_state ORDER BY version DESC LIMIT 1"
    )
    for row in result:
        print(row)

Reset State

Reset resource state when needed:
import dlt
from dlt.extract.state import reset_resource_state

@dlt.resource
def my_resource():
    state = dlt.current.resource_state()
    
    # Check if we need to reset
    if state.get("needs_reset"):
        # Clear all state
        state.clear()
        # Or reset specific keys
        state.pop("last_id", None)
    
    # Process data...
    yield from process_data()

Reset via Pipeline Refresh

import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb"
)

# Drop tables and state for all sources
load_info = pipeline.run(my_source(), refresh="drop_sources")

# Drop tables and state for specific resources
load_info = pipeline.run(my_source(), refresh="drop_resources")

# Drop data but keep state and schema
load_info = pipeline.run(my_source(), refresh="drop_data")

Advanced Patterns

Checkpoint Pattern

import dlt

@dlt.resource(write_disposition="append")
def api_with_checkpoints():
    """Process large dataset with checkpoints"""
    state = dlt.current.resource_state()
    page = state.get("last_page", 0)
    
    while True:
        data = fetch_page(page)
        
        if not data:
            # Finished - reset checkpoint
            state.pop("last_page", None)
            break
        
        yield data
        
        # Save checkpoint after each page
        page += 1
        state["last_page"] = page

Multi-Source State Management

import dlt

@dlt.source
def combined_source():
    """Multiple sources sharing state via source_state"""
    
    @dlt.resource
    def primary_data():
        state = dlt.current.source_state()
        last_sync = state.get("sync_timestamp")
        
        # Process primary data
        for item in fetch_primary(last_sync):
            yield item
        
        # Update shared timestamp
        import pendulum
        state["sync_timestamp"] = pendulum.now().isoformat()
    
    @dlt.resource
    def dependent_data():
        # Uses the same sync_timestamp
        state = dlt.current.source_state()
        last_sync = state.get("sync_timestamp")
        
        for item in fetch_dependent(last_sync):
            yield item
    
    return [primary_data, dependent_data]

Best Practices

1

Use resource state for resource-specific tracking

Keep state isolated to resources unless sharing is required
2

Use dlt.sources.incremental() for timestamp-based incrementals

This is the simplest and most reliable incremental loading pattern
3

Store minimal state

Only store what you need - IDs, timestamps, URLs. Don’t store large data structures
4

Handle missing state gracefully

Always use .get() with defaults for first-run scenarios
5

Update state after successful processing

Only update state after data is yielded to ensure consistency
State Synchronization: State is automatically synced to the destination after each successful pipeline run. If a run fails, state changes are not persisted.
State Size: Keep state small. Don’t store lists of thousands of IDs - use incremental timestamps or high-water marks instead.

Common Patterns

Track Last Modified Timestamp

import dlt
import pendulum

@dlt.resource(write_disposition="merge", primary_key="id")
def updated_records(
    updated_at=dlt.sources.incremental("updated_at")
):
    """Standard incremental pattern with timestamp"""
    last_update = updated_at.last_value
    
    for record in fetch_updated_since(last_update):
        yield record

Track Processed IDs

import dlt

@dlt.resource(write_disposition="append")
def process_items():
    """Track which items have been processed"""
    state = dlt.current.resource_state()
    processed_ids = set(state.get("processed", []))
    
    for item in fetch_all_items():
        if item["id"] in processed_ids:
            continue
        
        yield item
        processed_ids.add(item["id"])
    
    # Store as list (sets aren't JSON serializable)
    state["processed"] = list(processed_ids)

Track High-Water Mark

import dlt

@dlt.resource(write_disposition="append")
def events():
    """Use highest ID seen as checkpoint"""
    state = dlt.current.resource_state()
    last_id = state.get("max_id", 0)
    
    events = fetch_events_after_id(last_id)
    
    for event in events:
        yield event
        if event["id"] > last_id:
            last_id = event["id"]
    
    state["max_id"] = last_id
  • Resource - Uses state for incremental loading
  • Pipeline - Manages state persistence
  • Source - Provides source-level state
  • Destination - Stores state in _dlt_pipeline_state table

Build docs developers (and LLMs) love