The dlt.sources.helpers module provides utilities for common data loading tasks, including HTTP clients, REST API pagination, and data transformations.
REST Client
A powerful declarative REST API client with automatic pagination, authentication, and request handling.
Import
from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client import AuthConfigBase
from dlt.sources.helpers.rest_client import BasePaginator
RESTClient
Declarative REST API client with built-in pagination and authentication support.
from dlt.sources.helpers.rest_client import RESTClient
client = RESTClient(
base_url="https://api.github.com",
headers={"Accept": "application/vnd.github+json"}
)
# Iterate over paginated results
for page in client.paginate("/repos/dlt-hub/dlt/issues"):
yield page
Key Features:
- Automatic pagination with multiple strategies (offset, cursor, header-based)
- Built-in authentication (Bearer, API key, OAuth)
- Configurable retry logic
- Response hooks for custom processing
- JSON path selectors for extracting data from responses
paginate() Function
Simple function for quick pagination without creating a client instance.
from dlt.sources.helpers.rest_client import paginate
@dlt.resource
def github_issues():
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={"state": "open", "per_page": 100}
):
yield page
HTTP method: "GET", "POST", "PUT", "PATCH", "DELETE"
HTTP headers to send with requests.
JSON body for POST/PUT/PATCH requests.
Authentication configuration.
Paginator instance to use for pagination logic.
JSON path to extract data from each page. Example: "data", "results", "items[*]"
Returns: Iterator of pages.
HTTP Requests
A pre-configured requests client with automatic retries and timeout handling.
Import
from dlt.sources.helpers import requests
Functions
All standard requests library functions with automatic retry and configuration:
from dlt.sources.helpers import requests
# GET request with automatic retries
response = requests.get(
"https://api.github.com/users/octocat",
headers={"Authorization": "Bearer token"}
)
data = response.json()
# POST request
response = requests.post(
"https://api.example.com/data",
json={"key": "value"}
)
Available functions:
requests.get(url, **kwargs)
requests.post(url, **kwargs)
requests.put(url, **kwargs)
requests.patch(url, **kwargs)
requests.delete(url, **kwargs)
requests.head(url, **kwargs)
requests.options(url, **kwargs)
requests.request(method, url, **kwargs)
Features:
- Automatic retry on failure (configurable via
RuntimeConfiguration)
- Configurable timeouts
- Same API as standard
requests library
- Thread-safe
Client
For custom configuration, create a Client instance:
from dlt.sources.helpers.requests import Client
client = Client()
client.update_from_config(config) # Configure from RuntimeConfiguration
response = client.get("https://api.example.com/data")
Session
For persistent sessions with connection pooling:
from dlt.sources.helpers.requests import Session
with Session() as session:
session.headers.update({"Authorization": "Bearer token"})
response = session.get("https://api.example.com/data")
Helper functions for transforming data items in resources.
Import
from dlt.sources.helpers.transform import take_first, skip_first, pivot
take_first()
Filter that takes only the first N items from a resource.
from dlt.sources.helpers.transform import take_first
@dlt.resource
def limited_data():
yield from range(1000)
# Take only first 100 items
pipeline.run(
limited_data().add_map(take_first(100))
)
Maximum number of items to take.
skip_first()
Filter that skips the first N items from a resource.
from dlt.sources.helpers.transform import skip_first
@dlt.resource
def data_without_header():
yield from all_rows # Including header row
# Skip header row
pipeline.run(
data_without_header().add_map(skip_first(1))
)
pivot()
Transform sequences of sequences into sequences of dictionaries.
from dlt.sources.helpers.transform import pivot
# Input: {"data": [[1, 2, 3], [4, 5, 6]]}
# Output: {"data": [{"col_0": 1, "col_1": 2, "col_2": 3}, {"col_0": 4, "col_1": 5, "col_2": 6}]}
@dlt.resource
def matrix_data():
yield {"data": [[1, 2, 3], [4, 5, 6]]}
pipeline.run(
matrix_data().add_map(pivot(paths="data", prefix="col_"))
)
paths
str | List[str]
default:"$"
JSON paths to fields to pivot. Use "$" for root-level arrays.
Prefix for generated column names.
add_row_hash_to_table()
Compute and add content hash for each row in Pandas DataFrame or Arrow table.
from dlt.sources.helpers.transform import add_row_hash_to_table
import pandas as pd
@dlt.resource
def users_df():
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"]
})
yield df
# Add row_hash column
pipeline.run(
users_df().add_map(add_row_hash_to_table("row_hash"))
)
Name of the column to add with row hashes.
Use Cases:
- SCD2 (Slowly Changing Dimension Type 2) tracking
- Change detection
- Deduplication based on content
Complete Example
Combining multiple helpers:
import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.transform import take_first
@dlt.resource(
primary_key="id",
write_disposition="merge"
)
def github_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01T00:00:00Z")
):
# Paginate over GitHub API
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={
"state": "all",
"since": updated_at.last_value,
"per_page": 100
},
headers={"Accept": "application/vnd.github+json"},
data_selector="$" # Results are at root level
):
yield page
# Run pipeline with limited items for testing
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data"
)
pipeline.run(
github_issues().add_map(take_first(50)) # Limit to 50 items
)
See Also