State is a persistent dictionary that dlt maintains across pipeline runs. It enables incremental loading by tracking what data has already been processed, allowing you to pick up where you left off.
What is State?
State in dlt:
- Persists across runs - Stored with your pipeline and restored on next run
- Tracks progress - Remembers what data has been processed
- Enables incremental loading - Load only new or changed data
- Scoped to sources and resources - Isolated state for different parts of your pipeline
- Loaded to destination - State is stored in a special table for reliability
Think of state as your pipeline’s memory - it remembers where it left off so you don’t reprocess data.
Why Use State?
Without state, you’d reload all data every time:
import dlt
@dlt.resource(write_disposition="replace")
def all_issues():
"""Loads ALL issues every run - inefficient!"""
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
response = requests.get(url)
yield response.json()
With state, load only what’s new:
import dlt
import pendulum
@dlt.resource(write_disposition="append")
def new_issues():
"""Loads only issues created since last run"""
# Get resource state
state = dlt.current.resource_state()
last_created = state.get("last_created_at")
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
params = {}
if last_created:
params["since"] = last_created
print(f"Loading issues since {last_created}")
else:
print("First run - loading all issues")
response = requests.get(url, params=params)
issues = response.json()
if issues:
# Track the latest created_at timestamp
latest = max(issue["created_at"] for issue in issues)
state["last_created_at"] = latest
yield issues
Resource State
Resource state is scoped to a specific resource:
import dlt
@dlt.resource(write_disposition="append")
def players_games(players: list[str]):
"""Track processed archives per resource"""
# Each resource has its own state
state = dlt.current.resource_state()
checked_archives = state.setdefault("archives", [])
for player in players:
archives = get_player_archives(player)
for archive_url in archives:
if archive_url in checked_archives:
print(f"Skipping {archive_url}")
continue
print(f"Processing {archive_url}")
checked_archives.append(archive_url)
response = requests.get(archive_url)
yield response.json().get("games", [])
Access Resource State
From /home/daytona/workspace/source/dlt/extract/state.py:77-128:
def resource_state(resource_name: str = None) -> DictStrAny:
"""Returns a dictionary with resource-scoped state.
The resource state is visible only to the resource requesting it.
State is preserved across pipeline runs for incremental loads.
"""
Usage:
import dlt
@dlt.resource
def my_resource():
# Get state for current resource
state = dlt.current.resource_state()
# Read from state
last_id = state.get("last_processed_id", 0)
# Process data
for item in fetch_items_after(last_id):
yield item
last_id = item["id"]
# Write to state
state["last_processed_id"] = last_id
Source State
Source state is shared across all resources in a source:
import dlt
import pendulum
@dlt.source
def api_source():
"""All resources share source-level state"""
@dlt.resource
def users():
# Access shared source state
state = dlt.current.source_state()
last_sync = state.get("last_full_sync")
if not last_sync:
print("First sync - loading all users")
else:
print(f"Last sync: {last_sync}")
for user in fetch_users():
yield user
# Update shared state
state["last_full_sync"] = pendulum.now().isoformat()
@dlt.resource
def events():
# Same source state accessible here
state = dlt.current.source_state()
last_sync = state.get("last_full_sync")
print(f"Using last_sync from source state: {last_sync}")
for event in fetch_events_since(last_sync):
yield event
return [users, events]
Access Source State
From /home/daytona/workspace/source/dlt/extract/state.py:33-64:
def source_state(source_state_key: Optional[str] = None) -> DictStrAny:
"""Returns a dictionary with source-scoped state.
Source state may be shared across resources of a particular source.
State is preserved across pipeline runs.
"""
Source State is Read-Only: In source-decorated functions, source state is read-only. Only resource functions can write to state.
Incremental Loading with dlt.sources.incremental
The recommended way to do incremental loading:
import dlt
from dlt.sources.helpers import requests
@dlt.resource(
write_disposition="append",
primary_key="id"
)
def github_issues(
updated_since=dlt.sources.incremental("updated_at")
):
"""Load only issues updated since last run"""
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
params = {"state": "all", "sort": "updated", "direction": "asc"}
# Use last value from previous run
if updated_since.last_value:
params["since"] = updated_since.last_value.isoformat()
print(f"Loading issues since {params['since']}")
response = requests.get(url, params=params)
issues = response.json()
for issue in issues:
yield issue
Incremental with Initial Value
import dlt
import pendulum
@dlt.resource(
write_disposition="append",
primary_key="id"
)
def transactions(
created_after=dlt.sources.incremental(
"created_at",
initial_value=pendulum.parse("2024-01-01")
)
):
"""Load transactions, starting from 2024-01-01 on first run"""
start_date = created_after.last_value or created_after.initial_value
for transaction in fetch_transactions_after(start_date):
yield transaction
State Structure
State is stored hierarchically:
{
"sources": {
"my_source": { # Source-level state
"last_full_sync": "2024-01-15T10:00:00Z",
"resources": { # Resource-level state
"users": {
"last_user_id": 12345,
"pages_processed": 42
},
"orders": {
"last_order_date": "2024-01-14",
"archives": ["url1", "url2"]
}
}
}
}
}
State Persistence
State is stored in two places:
- Local filesystem - In pipeline working directory
- Destination table - In
_dlt_pipeline_state table
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb"
)
# State loaded from destination on first run
load_info = pipeline.run(my_source())
# State saved to destination after successful run
View State in Destination
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb"
)
# Query the state table
with pipeline.sql_client() as client:
result = client.execute_sql(
"SELECT * FROM _dlt_pipeline_state ORDER BY version DESC LIMIT 1"
)
for row in result:
print(row)
Reset State
Reset resource state when needed:
import dlt
from dlt.extract.state import reset_resource_state
@dlt.resource
def my_resource():
state = dlt.current.resource_state()
# Check if we need to reset
if state.get("needs_reset"):
# Clear all state
state.clear()
# Or reset specific keys
state.pop("last_id", None)
# Process data...
yield from process_data()
Reset via Pipeline Refresh
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb"
)
# Drop tables and state for all sources
load_info = pipeline.run(my_source(), refresh="drop_sources")
# Drop tables and state for specific resources
load_info = pipeline.run(my_source(), refresh="drop_resources")
# Drop data but keep state and schema
load_info = pipeline.run(my_source(), refresh="drop_data")
Advanced Patterns
Checkpoint Pattern
import dlt
@dlt.resource(write_disposition="append")
def api_with_checkpoints():
"""Process large dataset with checkpoints"""
state = dlt.current.resource_state()
page = state.get("last_page", 0)
while True:
data = fetch_page(page)
if not data:
# Finished - reset checkpoint
state.pop("last_page", None)
break
yield data
# Save checkpoint after each page
page += 1
state["last_page"] = page
Multi-Source State Management
import dlt
@dlt.source
def combined_source():
"""Multiple sources sharing state via source_state"""
@dlt.resource
def primary_data():
state = dlt.current.source_state()
last_sync = state.get("sync_timestamp")
# Process primary data
for item in fetch_primary(last_sync):
yield item
# Update shared timestamp
import pendulum
state["sync_timestamp"] = pendulum.now().isoformat()
@dlt.resource
def dependent_data():
# Uses the same sync_timestamp
state = dlt.current.source_state()
last_sync = state.get("sync_timestamp")
for item in fetch_dependent(last_sync):
yield item
return [primary_data, dependent_data]
Best Practices
Use resource state for resource-specific tracking
Keep state isolated to resources unless sharing is required
Use dlt.sources.incremental() for timestamp-based incrementals
This is the simplest and most reliable incremental loading pattern
Store minimal state
Only store what you need - IDs, timestamps, URLs. Don’t store large data structures
Handle missing state gracefully
Always use .get() with defaults for first-run scenarios
Update state after successful processing
Only update state after data is yielded to ensure consistency
State Synchronization: State is automatically synced to the destination after each successful pipeline run. If a run fails, state changes are not persisted.
State Size: Keep state small. Don’t store lists of thousands of IDs - use incremental timestamps or high-water marks instead.
Common Patterns
Track Last Modified Timestamp
import dlt
import pendulum
@dlt.resource(write_disposition="merge", primary_key="id")
def updated_records(
updated_at=dlt.sources.incremental("updated_at")
):
"""Standard incremental pattern with timestamp"""
last_update = updated_at.last_value
for record in fetch_updated_since(last_update):
yield record
Track Processed IDs
import dlt
@dlt.resource(write_disposition="append")
def process_items():
"""Track which items have been processed"""
state = dlt.current.resource_state()
processed_ids = set(state.get("processed", []))
for item in fetch_all_items():
if item["id"] in processed_ids:
continue
yield item
processed_ids.add(item["id"])
# Store as list (sets aren't JSON serializable)
state["processed"] = list(processed_ids)
Track High-Water Mark
import dlt
@dlt.resource(write_disposition="append")
def events():
"""Use highest ID seen as checkpoint"""
state = dlt.current.resource_state()
last_id = state.get("max_id", 0)
events = fetch_events_after_id(last_id)
for event in events:
yield event
if event["id"] > last_id:
last_id = event["id"]
state["max_id"] = last_id
- Resource - Uses state for incremental loading
- Pipeline - Manages state persistence
- Source - Provides source-level state
- Destination - Stores state in _dlt_pipeline_state table