Custom Sources
Create custom data sources to load data from any API, database, or data origin. dlt provides decorators and utilities that make it easy to build production-ready sources with minimal code.Basic Custom Source
Create a simple source using Python generators:Create a resource function
Define a function that yields data:
import dlt
@dlt.resource(name="users", write_disposition="replace")
def get_users():
"""Fetch user data"""
users = [
{"id": 1, "name": "Alice", "email": "[email protected]"},
{"id": 2, "name": "Bob", "email": "[email protected]"},
{"id": 3, "name": "Charlie", "email": "[email protected]"},
]
yield users
Wrap in a source
Group resources in a source function:
@dlt.source
def my_source():
"""Custom data source"""
return get_users()
Loading from APIs
Simple API Source
Load data from a REST API:import dlt
from dlt.sources.helpers import requests
@dlt.resource(name="posts", write_disposition="replace")
def get_posts():
"""Fetch posts from JSONPlaceholder API"""
response = requests.get("https://jsonplaceholder.typicode.com/posts")
response.raise_for_status()
yield response.json()
@dlt.source
def jsonplaceholder():
"""Load data from JSONPlaceholder API"""
return get_posts()
pipeline = dlt.pipeline(
pipeline_name="jsonplaceholder",
destination="duckdb",
dataset_name="jsonplaceholder_data"
)
load_info = pipeline.run(jsonplaceholder())
print(load_info)
API with Authentication
Add authentication to your API requests:import dlt
from dlt.sources.helpers import requests
@dlt.resource(name="repos", write_disposition="merge", primary_key="id")
def get_github_repos(api_token: str = dlt.secrets.value):
"""Fetch repositories from GitHub API"""
headers = {
"Authorization": f"Bearer {api_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(
"https://api.github.com/user/repos",
headers=headers
)
response.raise_for_status()
yield response.json()
@dlt.source
def github_source(api_token: str = dlt.secrets.value):
"""Load GitHub data"""
return get_github_repos(api_token)
secrets.toml:
[sources.github_source]
api_token = "ghp_your_token_here"
Pagination
Simple Pagination
Iterate through paginated API responses:@dlt.resource(name="issues", write_disposition="replace")
def get_issues(api_token: str = dlt.secrets.value):
"""Fetch issues with pagination"""
headers = {"Authorization": f"Bearer {api_token}"}
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
params = {"per_page": 100, "page": 1}
while url:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
issues = response.json()
if not issues:
break
yield issues
# Get next page URL from Link header
link_header = response.headers.get("Link")
if link_header and 'rel="next"' in link_header:
params["page"] += 1
else:
break
Using REST Client Helper
Use dlt’s built-in REST client for automatic pagination:from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator
@dlt.resource
def paginated_resource(api_token: str = dlt.secrets.value):
"""Use REST client for automatic pagination"""
client = RESTClient(
base_url="https://api.github.com",
auth=BearerTokenAuth(token=api_token),
)
for page in client.paginate(
"/repos/dlt-hub/dlt/issues",
params={"per_page": 100},
paginator=JSONResponsePaginator(next_url_path="next_page"),
):
yield page
Incremental Loading
Load only new or updated records:- Basic Incremental
- Incremental with Merge
- Nested Incremental
@dlt.resource(
name="events",
write_disposition="append",
)
def get_events(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
):
"""Fetch events incrementally"""
response = requests.get(
"https://api.example.com/events",
params={"since": updated_at.last_value}
)
yield response.json()
@dlt.resource(
name="users",
write_disposition="merge",
primary_key="id",
)
def get_users(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
):
"""Fetch and merge updated users"""
response = requests.get(
"https://api.example.com/users",
params={
"updated_since": updated_at.last_value,
"limit": 1000
}
)
yield response.json()["users"]
@dlt.resource(
name="orders",
write_disposition="merge",
primary_key="order_id",
)
def get_orders(
created_at=dlt.sources.incremental(
"order_date", # Field in the data
initial_value="2024-01-01T00:00:00Z"
)
):
"""Incremental loading based on nested field"""
response = requests.get(
"https://api.example.com/orders",
params={"created_after": created_at.last_value}
)
yield response.json()
The
incremental object automatically tracks the maximum cursor value and uses it as the starting point for subsequent runs.Transformers
Process data from parent resources:@dlt.resource(name="users", write_disposition="replace")
def get_users():
"""Fetch users"""
response = requests.get("https://api.example.com/users")
yield response.json()
@dlt.transformer(
name="user_posts",
write_disposition="replace",
data_from=get_users
)
def get_user_posts(user):
"""Fetch posts for each user"""
user_id = user["id"]
response = requests.get(f"https://api.example.com/users/{user_id}/posts")
yield response.json()
@dlt.source
def api_source():
"""Source with dependent resources"""
return [get_users(), get_user_posts()]
Filtering and Transforming Data
Modify data before loading:@dlt.resource(name="active_users")
def get_active_users():
"""Load only active users"""
response = requests.get("https://api.example.com/users")
users = response.json()
# Filter active users
for user in users:
if user.get("status") == "active":
yield user
Schema Definition
Using Column Hints
Define column types and constraints:@dlt.resource(
name="products",
write_disposition="merge",
primary_key="product_id",
columns={
"product_id": {"data_type": "bigint", "nullable": False},
"name": {"data_type": "text", "nullable": False},
"price": {"data_type": "decimal", "precision": 10, "scale": 2},
"tags": {"data_type": "json"}, # Store as JSON, not nested table
"created_at": {"data_type": "timestamp"},
},
)
def get_products():
"""Fetch products with schema hints"""
response = requests.get("https://api.example.com/products")
yield response.json()
Using Pydantic Models
Define schema with Pydantic:from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
class Product(BaseModel):
product_id: int
name: str
price: float
tags: List[str]
created_at: datetime
description: Optional[str] = None
@dlt.resource(
name="products",
write_disposition="merge",
primary_key="product_id",
columns=Product,
)
def get_products():
"""Fetch products with Pydantic schema"""
response = requests.get("https://api.example.com/products")
yield response.json()
Error Handling
Implement robust error handling:from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
@dlt.resource(name="resilient_resource")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def get_data_with_retry():
"""Fetch data with automatic retry"""
try:
response = requests.get("https://api.example.com/data")
response.raise_for_status()
yield response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
raise
@dlt.resource(name="error_tolerant")
def get_data_with_error_handling():
"""Continue on individual record errors"""
response = requests.get("https://api.example.com/records")
records = response.json()
for record in records:
try:
# Process record
processed = process_record(record)
yield processed
except Exception as e:
# Log error but continue
logger.warning(f"Failed to process record {record.get('id')}: {e}")
continue
Complete Example: Custom API Source
A comprehensive example with authentication, pagination, and incremental loading:import dlt
from dlt.sources.helpers import requests
from typing import Iterator, Dict, Any
@dlt.source
def ecommerce_api(
api_key: str = dlt.secrets.value,
base_url: str = dlt.config.value,
):
"""Load data from e-commerce API"""
@dlt.resource(
name="customers",
write_disposition="merge",
primary_key="customer_id",
)
def get_customers(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
) -> Iterator[Dict[str, Any]]:
"""Fetch customers incrementally"""
headers = {"Authorization": f"Bearer {api_key}"}
page = 1
while True:
response = requests.get(
f"{base_url}/customers",
headers=headers,
params={
"updated_since": updated_at.last_value,
"page": page,
"per_page": 100,
}
)
response.raise_for_status()
data = response.json()
customers = data.get("customers", [])
if not customers:
break
yield customers
if not data.get("has_more"):
break
page += 1
@dlt.resource(
name="orders",
write_disposition="merge",
primary_key="order_id",
)
def get_orders(
created_at=dlt.sources.incremental(
"created_at",
initial_value="2024-01-01T00:00:00Z"
)
) -> Iterator[Dict[str, Any]]:
"""Fetch orders incrementally"""
headers = {"Authorization": f"Bearer {api_key}"}
page = 1
while True:
response = requests.get(
f"{base_url}/orders",
headers=headers,
params={
"created_after": created_at.last_value,
"page": page,
"per_page": 100,
"expand": "line_items",
}
)
response.raise_for_status()
data = response.json()
orders = data.get("orders", [])
if not orders:
break
yield orders
if not data.get("has_more"):
break
page += 1
@dlt.resource(
name="products",
write_disposition="replace",
)
def get_products() -> Iterator[Dict[str, Any]]:
"""Fetch all products"""
headers = {"Authorization": f"Bearer {api_key}"}
page = 1
while True:
response = requests.get(
f"{base_url}/products",
headers=headers,
params={"page": page, "per_page": 100}
)
response.raise_for_status()
data = response.json()
products = data.get("products", [])
if not products:
break
yield products
if not data.get("has_more"):
break
page += 1
return [get_customers(), get_orders(), get_products()]
# Configure in config.toml
# [sources.ecommerce_api]
# base_url = "https://api.example.com/v1"
# Configure in secrets.toml
# [sources.ecommerce_api]
# api_key = "your_api_key_here"
# Run the pipeline
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="ecommerce_pipeline",
destination="duckdb",
dataset_name="ecommerce_data"
)
load_info = pipeline.run(ecommerce_api())
print(load_info)
Best Practices
Use generators for memory efficiency
Use generators for memory efficiency
Always use
yield instead of returning large lists:# ✅ Good: Memory efficient
@dlt.resource
def get_data():
for page in range(100):
data = fetch_page(page)
yield data
# ❌ Bad: Loads everything into memory
@dlt.resource
def get_data():
all_data = []
for page in range(100):
all_data.extend(fetch_page(page))
return all_data
Implement proper error handling
Implement proper error handling
Use retry logic and handle errors gracefully:
from tenacity import retry, stop_after_attempt
@dlt.resource
@retry(stop=stop_after_attempt(3))
def resilient_resource():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
yield response.json()
Use configuration for flexibility
Use configuration for flexibility
Externalize configuration and secrets:
@dlt.source
def my_source(
api_key: str = dlt.secrets.value,
base_url: str = dlt.config.value,
batch_size: int = dlt.config.value,
):
# Source implementation
pass
Add type hints
Add type hints
Use type hints for better IDE support and documentation:
from typing import Iterator, Dict, Any
@dlt.resource
def typed_resource() -> Iterator[Dict[str, Any]]:
yield {"key": "value"}
Next Steps
- Explore REST API Source for declarative API loading
- Learn about SQL Database Source
- Check Available Sources