Skip to main content
The Incremental class enables incremental data loading by tracking a cursor value in persistent state. This allows you to load only new or updated records since the last pipeline run, significantly reducing API calls and processing time.

Basic Usage

import dlt
from dlt.sources.helpers import requests

@dlt.resource(primary_key="id")
def issues(created_at=dlt.sources.incremental("created_at", initial_value="2024-01-01T00:00:00Z")):
    # Load only issues created after the last run
    response = requests.get(
        "https://api.github.com/repos/owner/repo/issues",
        params={"since": created_at.last_value}
    )
    yield response.json()

Import

from dlt.sources import incremental
# or
import dlt
dlt.sources.incremental(...)

Constructor

Incremental(
    cursor_path: str,
    initial_value: Optional[Any] = None,
    last_value_func: Union[Callable, Literal["min", "max"]] = max,
    primary_key: Optional[str | Tuple[str, ...]] = None,
    end_value: Optional[Any] = None,
    row_order: Optional[Literal["asc", "desc"]] = None,
    allow_external_schedulers: bool = False,
    on_cursor_value_missing: Literal["raise", "include", "exclude"] = "raise",
    lag: Optional[float] = None,
    range_start: Literal["open", "closed"] = "closed",
    range_end: Literal["open", "closed"] = "open"
)

Parameters

cursor_path
str
required
The name or JSON path to the cursor field in your data. This field tracks the incremental progress.Examples: "created_at", "updated_at", "item.timestamp"
initial_value
Any
default:"None"
Starting value for last_value on the first run. Can be a datetime, date, int, float, or string.Example: "2024-01-01T00:00:00Z", 1234567890, datetime(2024, 1, 1)
last_value_func
Callable | Literal['min', 'max']
default:"max"
Function to determine which cursor value to save. Use max for ascending timestamps, min for descending.Options: max, min, or custom callable
primary_key
str | Tuple[str, ...]
default:"None"
Primary key for deduplication. If not provided, uses the resource’s primary key. Pass empty tuple () to disable deduplication.Example: "id", ("user_id", "event_id")
end_value
Any
default:"None"
Upper bound for loading a limited range. When set, loading becomes stateless.Example: Load data for a specific month: incremental("created_at", initial_value="2024-01-01", end_value="2024-02-01")
row_order
Literal['asc', 'desc']
default:"None"
Declares the sort order of data from the source. Enables early stopping when out-of-range data is encountered.Values: "asc" (ascending), "desc" (descending)
allow_external_schedulers
bool
default:"False"
Allow external schedulers (like Airflow) to provide initial_value and end_value from execution context.In Airflow, uses data_interval_start and data_interval_end from the context.
on_cursor_value_missing
Literal['raise', 'include', 'exclude']
default:"raise"
Behavior when cursor field is missing from a record:
  • "raise": Raise an error
  • "include": Include the record
  • "exclude": Skip the record
lag
float
default:"None"
Attribution window / lag to apply to the cursor. For datetime cursors, this is in seconds. Useful for eventually consistent APIs.Example: lag=3600 (1 hour lag for datetime cursors)
range_start
Literal['open', 'closed']
default:"closed"
Whether the start of the range is inclusive ("closed") or exclusive ("open")."open" disables deduplication and excludes records with cursor value equal to last_value.
range_end
Literal['open', 'closed']
default:"open"
Whether the end of the range is inclusive ("closed") or exclusive ("open")."open" excludes records with cursor value equal to end_value.

Properties

last_value

The current cursor value from state. Use this to query your data source.
@dlt.resource
def my_resource(updated_at=dlt.sources.incremental("updated_at")):
    # Query API with the last known value
    params = {"updated_since": updated_at.last_value}
    response = requests.get("https://api.example.com/data", params=params)
    yield response.json()

start_value

The cursor value at the beginning of the current pipeline run (before processing new data).

end_value

The upper bound value if set during initialization.

Methods

from_existing_state(resource_name: str, cursor_path: str)

Create an Incremental instance from existing state without relying on function argument injection.
@dlt.resource
def my_resource():
    # Access incremental state directly
    incremental_state = dlt.sources.incremental.from_existing_state(
        "my_resource", "updated_at"
    )
    last_update = incremental_state.last_value
    
    response = requests.get(f"https://api.example.com/data?since={last_update}")
    yield response.json()

Advanced Examples

Compound Cursor (Timestamp + ID)

@dlt.resource(primary_key="id")
def events(
    updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01")
):
    # For APIs that use both timestamp and ID for pagination
    response = requests.get(
        "https://api.example.com/events",
        params={"updated_since": updated_at.last_value}
    )
    yield response.json()

Descending Order with min

@dlt.resource
def recent_items(
    item_id=dlt.sources.incremental(
        "id",
        initial_value=999999,
        last_value_func=min,  # Track minimum ID seen
        row_order="desc"  # Data comes in descending order
    )
):
    # Load items with ID less than last_value
    response = requests.get(
        "https://api.example.com/items",
        params={"max_id": item_id.last_value}
    )
    yield response.json()

Loading Date Ranges

# Load specific month (stateless)
pipeline.run(
    my_resource(created_at=dlt.sources.incremental(
        "created_at",
        initial_value="2024-01-01",
        end_value="2024-02-01"
    ))
)

With Airflow Integration

@dlt.resource
def events(
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="2024-01-01",
        allow_external_schedulers=True  # Use Airflow execution dates
    )
):
    # Airflow will set initial_value and end_value from data_interval_start/end
    response = requests.get(
        "https://api.example.com/events",
        params={"start": created_at.start_value, "end": created_at.end_value}
    )
    yield response.json()

Handling Eventually Consistent APIs

@dlt.resource
def events(
    updated_at=dlt.sources.incremental(
        "updated_at",
        initial_value="2024-01-01",
        lag=3600  # 1-hour attribution window
    )
):
    # The cursor will lag 1 hour behind to catch late-arriving updates
    response = requests.get(
        "https://api.example.com/events",
        params={"updated_after": updated_at.last_value}
    )
    yield response.json()

As Transform Step

@dlt.resource
def some_data():
    # Resource yields all data
    yield from fetch_all_data()

# Add incremental as a transform step
r = some_data().add_step(
    dlt.sources.incremental("item.ts", initial_value=datetime.now(), primary_key="delta")
)
pipeline.run(r, destination="duckdb")

How Deduplication Works

When a resource has a primary_key, Incremental automatically deduplicates records with the same cursor value:
  1. Records are filtered to include only those with cursor values >= last_value
  2. Records with cursor value equal to last_value are deduplicated using their primary key hash
  3. Unique hashes are stored in state to prevent duplicates on the next run
To disable deduplication, set primary_key=() or use range_start="open".

State Management

Incremental state is stored at:
pipeline_state.sources.<source_name>.<resource_name>.incremental.<cursor_path>
State contains:
  • initial_value: The initial cursor value
  • last_value: The highest cursor value seen
  • unique_hashes: List of primary key hashes at the cursor boundary (for deduplication)

See Also

Build docs developers (and LLMs) love