Incremental class enables incremental data loading by tracking a cursor value in persistent state. This allows you to load only new or updated records since the last pipeline run, significantly reducing API calls and processing time.
Basic Usage
Import
Constructor
Parameters
The name or JSON path to the cursor field in your data. This field tracks the incremental progress.Examples:
"created_at", "updated_at", "item.timestamp"Starting value for
last_value on the first run. Can be a datetime, date, int, float, or string.Example: "2024-01-01T00:00:00Z", 1234567890, datetime(2024, 1, 1)Function to determine which cursor value to save. Use
max for ascending timestamps, min for descending.Options: max, min, or custom callablePrimary key for deduplication. If not provided, uses the resource’s primary key. Pass empty tuple
() to disable deduplication.Example: "id", ("user_id", "event_id")Upper bound for loading a limited range. When set, loading becomes stateless.Example: Load data for a specific month:
incremental("created_at", initial_value="2024-01-01", end_value="2024-02-01")Declares the sort order of data from the source. Enables early stopping when out-of-range data is encountered.Values:
"asc" (ascending), "desc" (descending)Allow external schedulers (like Airflow) to provide
initial_value and end_value from execution context.In Airflow, uses data_interval_start and data_interval_end from the context.Behavior when cursor field is missing from a record:
"raise": Raise an error"include": Include the record"exclude": Skip the record
Attribution window / lag to apply to the cursor. For datetime cursors, this is in seconds. Useful for eventually consistent APIs.Example:
lag=3600 (1 hour lag for datetime cursors)Whether the start of the range is inclusive (
"closed") or exclusive ("open")."open" disables deduplication and excludes records with cursor value equal to last_value.Whether the end of the range is inclusive (
"closed") or exclusive ("open")."open" excludes records with cursor value equal to end_value.Properties
last_value
The current cursor value from state. Use this to query your data source.
start_value
The cursor value at the beginning of the current pipeline run (before processing new data).
end_value
The upper bound value if set during initialization.
Methods
from_existing_state(resource_name: str, cursor_path: str)
Create an Incremental instance from existing state without relying on function argument injection.
Advanced Examples
Compound Cursor (Timestamp + ID)
Descending Order with min
Loading Date Ranges
With Airflow Integration
Handling Eventually Consistent APIs
As Transform Step
How Deduplication Works
When a resource has aprimary_key, Incremental automatically deduplicates records with the same cursor value:
- Records are filtered to include only those with cursor values >=
last_value - Records with cursor value equal to
last_valueare deduplicated using their primary key hash - Unique hashes are stored in state to prevent duplicates on the next run
primary_key=() or use range_start="open".
State Management
Incremental state is stored at:initial_value: The initial cursor valuelast_value: The highest cursor value seenunique_hashes: List of primary key hashes at the cursor boundary (for deduplication)
See Also
- Incremental Loading Guide
- @dlt.source - Source decorator
- @dlt.resource - Resource decorator