Load only new or changed data to reduce costs and latency
Incremental loading enables you to load only new or changed data instead of reloading everything. This reduces costs, improves performance, and enables real-time data pipelines.
# The incremental instance provides:# initial_value: Starting point (never changes)updated_at.initial_value # "1970-01-01T00:00:00Z"# start_value: Max value from previous run or initial_valueupdated_at.start_value # "2024-01-01T10:30:00Z"# last_value: Real-time max value (updates with each item)updated_at.last_value # "2024-01-15T14:22:00Z"# end_value: Optional end point for backfillsupdated_at.end_value # "2024-02-01T00:00:00Z"
When an API doesn’t support filtering, dlt handles it:
@dlt.resource( primary_key="id", table_name=lambda i: i["type"] # Dynamic table names)def repo_events( repository: str, created_at=dlt.sources.incremental( "created_at", initial_value="1970-01-01T00:00:00Z", row_order="desc" # Optimize: stop when out of range )): """GitHub events API returns all events - dlt filters them""" url = f"https://api.github.com/repos/{repository}/events" # API doesn't support 'since' parameter response = requests.get(url, params={"per_page": 100}) response.raise_for_status() # Yield all data - dlt automatically: # 1. Filters out old events using created_at # 2. Deduplicates using primary_key (id) # 3. Stops pagination when row_order="desc" and out of range yield response.json()
Handle late-arriving data by reprocessing recent records:
@dlt.resource(primary_key="id")def events( created_at=dlt.sources.incremental( "created_at", initial_value="2024-01-01T00:00:00Z", lag=3600, # Reprocess last hour (in seconds) )): """Reprocess last hour of data to catch late arrivals""" # If last_value was 2024-01-15 14:00:00 # start_value will be 2024-01-15 13:00:00 (1 hour earlier) yield fetch_events(since=created_at.start_value)
pipeline = dlt.pipeline( pipeline_name="github", destination="duckdb", dataset_name="github_data",)# Normal incremental loadpipeline.run(repo_issues())# Force full refresh - deletes and reloadspipeline.run(repo_issues(), write_disposition="replace")# Refresh just one tablepipeline.run( repo_issues().with_resources("issues"), write_disposition="replace")
Use updated_at for data that changes (users, orders)
Use created_at for append-only data (events, logs)
Ensure the cursor field is indexed in the source
Verify the cursor field is always populated
Handle Time Zones
Always use UTC timestamps
Ensure source and destination use the same timezone
Use timezone-aware datetime objects
Set Appropriate Initial Values
Don’t use far past dates unless necessary (e.g., “1970-01-01”)
Start from a reasonable date to limit initial load
Consider using end_value for bounded initial loads
Monitor State
# Check incremental statepipeline = dlt.pipeline( pipeline_name="github", destination="duckdb",)# View current stateprint(pipeline.state)# Reset state if neededpipeline.drop()