Skip to main content
The dlt.sources.helpers module provides utilities for common data loading tasks, including HTTP clients, REST API pagination, and data transformations.

REST Client

A powerful declarative REST API client with automatic pagination, authentication, and request handling.

Import

from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client import AuthConfigBase
from dlt.sources.helpers.rest_client import BasePaginator

RESTClient

Declarative REST API client with built-in pagination and authentication support.
from dlt.sources.helpers.rest_client import RESTClient

client = RESTClient(
    base_url="https://api.github.com",
    headers={"Accept": "application/vnd.github+json"}
)

# Iterate over paginated results
for page in client.paginate("/repos/dlt-hub/dlt/issues"):
    yield page
Key Features:
  • Automatic pagination with multiple strategies (offset, cursor, header-based)
  • Built-in authentication (Bearer, API key, OAuth)
  • Configurable retry logic
  • Response hooks for custom processing
  • JSON path selectors for extracting data from responses

paginate() Function

Simple function for quick pagination without creating a client instance.
from dlt.sources.helpers.rest_client import paginate

@dlt.resource
def github_issues():
    for page in paginate(
        "https://api.github.com/repos/dlt-hub/dlt/issues",
        params={"state": "open", "per_page": 100}
    ):
        yield page
url
str
required
URL to paginate over.
method
str
default:"GET"
HTTP method: "GET", "POST", "PUT", "PATCH", "DELETE"
headers
Dict[str, str]
HTTP headers to send with requests.
params
Dict[str, Any]
Query parameters.
json
Dict[str, Any]
JSON body for POST/PUT/PATCH requests.
auth
AuthConfigBase
Authentication configuration.
paginator
BasePaginator
Paginator instance to use for pagination logic.
data_selector
str
JSON path to extract data from each page. Example: "data", "results", "items[*]"
Returns: Iterator of pages.

HTTP Requests

A pre-configured requests client with automatic retries and timeout handling.

Import

from dlt.sources.helpers import requests

Functions

All standard requests library functions with automatic retry and configuration:
from dlt.sources.helpers import requests

# GET request with automatic retries
response = requests.get(
    "https://api.github.com/users/octocat",
    headers={"Authorization": "Bearer token"}
)
data = response.json()

# POST request
response = requests.post(
    "https://api.example.com/data",
    json={"key": "value"}
)
Available functions:
  • requests.get(url, **kwargs)
  • requests.post(url, **kwargs)
  • requests.put(url, **kwargs)
  • requests.patch(url, **kwargs)
  • requests.delete(url, **kwargs)
  • requests.head(url, **kwargs)
  • requests.options(url, **kwargs)
  • requests.request(method, url, **kwargs)
Features:
  • Automatic retry on failure (configurable via RuntimeConfiguration)
  • Configurable timeouts
  • Same API as standard requests library
  • Thread-safe

Client

For custom configuration, create a Client instance:
from dlt.sources.helpers.requests import Client

client = Client()
client.update_from_config(config)  # Configure from RuntimeConfiguration

response = client.get("https://api.example.com/data")

Session

For persistent sessions with connection pooling:
from dlt.sources.helpers.requests import Session

with Session() as session:
    session.headers.update({"Authorization": "Bearer token"})
    response = session.get("https://api.example.com/data")

Data Transformations

Helper functions for transforming data items in resources.

Import

from dlt.sources.helpers.transform import take_first, skip_first, pivot

take_first()

Filter that takes only the first N items from a resource.
from dlt.sources.helpers.transform import take_first

@dlt.resource
def limited_data():
    yield from range(1000)

# Take only first 100 items
pipeline.run(
    limited_data().add_map(take_first(100))
)
max_items
int
required
Maximum number of items to take.

skip_first()

Filter that skips the first N items from a resource.
from dlt.sources.helpers.transform import skip_first

@dlt.resource
def data_without_header():
    yield from all_rows  # Including header row

# Skip header row
pipeline.run(
    data_without_header().add_map(skip_first(1))
)
max_items
int
required
Number of items to skip.

pivot()

Transform sequences of sequences into sequences of dictionaries.
from dlt.sources.helpers.transform import pivot

# Input: {"data": [[1, 2, 3], [4, 5, 6]]}
# Output: {"data": [{"col_0": 1, "col_1": 2, "col_2": 3}, {"col_0": 4, "col_1": 5, "col_2": 6}]}

@dlt.resource
def matrix_data():
    yield {"data": [[1, 2, 3], [4, 5, 6]]}

pipeline.run(
    matrix_data().add_map(pivot(paths="data", prefix="col_"))
)
paths
str | List[str]
default:"$"
JSON paths to fields to pivot. Use "$" for root-level arrays.
prefix
str
default:"col"
Prefix for generated column names.

add_row_hash_to_table()

Compute and add content hash for each row in Pandas DataFrame or Arrow table.
from dlt.sources.helpers.transform import add_row_hash_to_table
import pandas as pd

@dlt.resource
def users_df():
    df = pd.DataFrame({
        "id": [1, 2, 3],
        "name": ["Alice", "Bob", "Charlie"]
    })
    yield df

# Add row_hash column
pipeline.run(
    users_df().add_map(add_row_hash_to_table("row_hash"))
)
row_hash_column_name
str
required
Name of the column to add with row hashes.
Use Cases:
  • SCD2 (Slowly Changing Dimension Type 2) tracking
  • Change detection
  • Deduplication based on content

Complete Example

Combining multiple helpers:
import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.transform import take_first

@dlt.resource(
    primary_key="id",
    write_disposition="merge"
)
def github_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01T00:00:00Z")
):
    # Paginate over GitHub API
    for page in paginate(
        "https://api.github.com/repos/dlt-hub/dlt/issues",
        params={
            "state": "all",
            "since": updated_at.last_value,
            "per_page": 100
        },
        headers={"Accept": "application/vnd.github+json"},
        data_selector="$"  # Results are at root level
    ):
        yield page

# Run pipeline with limited items for testing
pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
    dataset_name="github_data"
)

pipeline.run(
    github_issues().add_map(take_first(50))  # Limit to 50 items
)

See Also

Build docs developers (and LLMs) love