Skip to main content

Custom Sources

Create custom data sources to load data from any API, database, or data origin. dlt provides decorators and utilities that make it easy to build production-ready sources with minimal code.

Basic Custom Source

Create a simple source using Python generators:
1

Create a resource function

Define a function that yields data:
import dlt

@dlt.resource(name="users", write_disposition="replace")
def get_users():
    """Fetch user data"""
    users = [
        {"id": 1, "name": "Alice", "email": "[email protected]"},
        {"id": 2, "name": "Bob", "email": "[email protected]"},
        {"id": 3, "name": "Charlie", "email": "[email protected]"},
    ]
    yield users
2

Wrap in a source

Group resources in a source function:
@dlt.source
def my_source():
    """Custom data source"""
    return get_users()
3

Load the data

Run the pipeline:
pipeline = dlt.pipeline(
    pipeline_name="custom_pipeline",
    destination="duckdb",
    dataset_name="my_data"
)

load_info = pipeline.run(my_source())
print(load_info)

Loading from APIs

Simple API Source

Load data from a REST API:
import dlt
from dlt.sources.helpers import requests

@dlt.resource(name="posts", write_disposition="replace")
def get_posts():
    """Fetch posts from JSONPlaceholder API"""
    response = requests.get("https://jsonplaceholder.typicode.com/posts")
    response.raise_for_status()
    yield response.json()

@dlt.source
def jsonplaceholder():
    """Load data from JSONPlaceholder API"""
    return get_posts()

pipeline = dlt.pipeline(
    pipeline_name="jsonplaceholder",
    destination="duckdb",
    dataset_name="jsonplaceholder_data"
)

load_info = pipeline.run(jsonplaceholder())
print(load_info)

API with Authentication

Add authentication to your API requests:
import dlt
from dlt.sources.helpers import requests

@dlt.resource(name="repos", write_disposition="merge", primary_key="id")
def get_github_repos(api_token: str = dlt.secrets.value):
    """Fetch repositories from GitHub API"""
    headers = {
        "Authorization": f"Bearer {api_token}",
        "Accept": "application/vnd.github.v3+json",
    }

    response = requests.get(
        "https://api.github.com/user/repos",
        headers=headers
    )
    response.raise_for_status()
    yield response.json()

@dlt.source
def github_source(api_token: str = dlt.secrets.value):
    """Load GitHub data"""
    return get_github_repos(api_token)
Configure credentials in secrets.toml:
[sources.github_source]
api_token = "ghp_your_token_here"

Pagination

Simple Pagination

Iterate through paginated API responses:
@dlt.resource(name="issues", write_disposition="replace")
def get_issues(api_token: str = dlt.secrets.value):
    """Fetch issues with pagination"""
    headers = {"Authorization": f"Bearer {api_token}"}
    url = "https://api.github.com/repos/dlt-hub/dlt/issues"
    params = {"per_page": 100, "page": 1}

    while url:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()

        issues = response.json()
        if not issues:
            break

        yield issues

        # Get next page URL from Link header
        link_header = response.headers.get("Link")
        if link_header and 'rel="next"' in link_header:
            params["page"] += 1
        else:
            break

Using REST Client Helper

Use dlt’s built-in REST client for automatic pagination:
from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator

@dlt.resource
def paginated_resource(api_token: str = dlt.secrets.value):
    """Use REST client for automatic pagination"""
    client = RESTClient(
        base_url="https://api.github.com",
        auth=BearerTokenAuth(token=api_token),
    )

    for page in client.paginate(
        "/repos/dlt-hub/dlt/issues",
        params={"per_page": 100},
        paginator=JSONResponsePaginator(next_url_path="next_page"),
    ):
        yield page

Incremental Loading

Load only new or updated records:
@dlt.resource(
    name="events",
    write_disposition="append",
)
def get_events(
    updated_at=dlt.sources.incremental(
        "updated_at",
        initial_value="2024-01-01T00:00:00Z"
    )
):
    """Fetch events incrementally"""
    response = requests.get(
        "https://api.example.com/events",
        params={"since": updated_at.last_value}
    )
    yield response.json()
The incremental object automatically tracks the maximum cursor value and uses it as the starting point for subsequent runs.

Transformers

Process data from parent resources:
@dlt.resource(name="users", write_disposition="replace")
def get_users():
    """Fetch users"""
    response = requests.get("https://api.example.com/users")
    yield response.json()

@dlt.transformer(
    name="user_posts",
    write_disposition="replace",
    data_from=get_users
)
def get_user_posts(user):
    """Fetch posts for each user"""
    user_id = user["id"]
    response = requests.get(f"https://api.example.com/users/{user_id}/posts")
    yield response.json()

@dlt.source
def api_source():
    """Source with dependent resources"""
    return [get_users(), get_user_posts()]

Filtering and Transforming Data

Modify data before loading:
@dlt.resource(name="active_users")
def get_active_users():
    """Load only active users"""
    response = requests.get("https://api.example.com/users")
    users = response.json()

    # Filter active users
    for user in users:
        if user.get("status") == "active":
            yield user

Schema Definition

Using Column Hints

Define column types and constraints:
@dlt.resource(
    name="products",
    write_disposition="merge",
    primary_key="product_id",
    columns={
        "product_id": {"data_type": "bigint", "nullable": False},
        "name": {"data_type": "text", "nullable": False},
        "price": {"data_type": "decimal", "precision": 10, "scale": 2},
        "tags": {"data_type": "json"},  # Store as JSON, not nested table
        "created_at": {"data_type": "timestamp"},
    },
)
def get_products():
    """Fetch products with schema hints"""
    response = requests.get("https://api.example.com/products")
    yield response.json()

Using Pydantic Models

Define schema with Pydantic:
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime

class Product(BaseModel):
    product_id: int
    name: str
    price: float
    tags: List[str]
    created_at: datetime
    description: Optional[str] = None

@dlt.resource(
    name="products",
    write_disposition="merge",
    primary_key="product_id",
    columns=Product,
)
def get_products():
    """Fetch products with Pydantic schema"""
    response = requests.get("https://api.example.com/products")
    yield response.json()

Error Handling

Implement robust error handling:
from tenacity import retry, stop_after_attempt, wait_exponential
import logging

logger = logging.getLogger(__name__)

@dlt.resource(name="resilient_resource")
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def get_data_with_retry():
    """Fetch data with automatic retry"""
    try:
        response = requests.get("https://api.example.com/data")
        response.raise_for_status()
        yield response.json()
    except requests.exceptions.RequestException as e:
        logger.error(f"Request failed: {e}")
        raise

@dlt.resource(name="error_tolerant")
def get_data_with_error_handling():
    """Continue on individual record errors"""
    response = requests.get("https://api.example.com/records")
    records = response.json()

    for record in records:
        try:
            # Process record
            processed = process_record(record)
            yield processed
        except Exception as e:
            # Log error but continue
            logger.warning(f"Failed to process record {record.get('id')}: {e}")
            continue

Complete Example: Custom API Source

A comprehensive example with authentication, pagination, and incremental loading:
import dlt
from dlt.sources.helpers import requests
from typing import Iterator, Dict, Any

@dlt.source
def ecommerce_api(
    api_key: str = dlt.secrets.value,
    base_url: str = dlt.config.value,
):
    """Load data from e-commerce API"""

    @dlt.resource(
        name="customers",
        write_disposition="merge",
        primary_key="customer_id",
    )
    def get_customers(
        updated_at=dlt.sources.incremental(
            "updated_at",
            initial_value="2024-01-01T00:00:00Z"
        )
    ) -> Iterator[Dict[str, Any]]:
        """Fetch customers incrementally"""
        headers = {"Authorization": f"Bearer {api_key}"}
        page = 1

        while True:
            response = requests.get(
                f"{base_url}/customers",
                headers=headers,
                params={
                    "updated_since": updated_at.last_value,
                    "page": page,
                    "per_page": 100,
                }
            )
            response.raise_for_status()

            data = response.json()
            customers = data.get("customers", [])

            if not customers:
                break

            yield customers

            if not data.get("has_more"):
                break

            page += 1

    @dlt.resource(
        name="orders",
        write_disposition="merge",
        primary_key="order_id",
    )
    def get_orders(
        created_at=dlt.sources.incremental(
            "created_at",
            initial_value="2024-01-01T00:00:00Z"
        )
    ) -> Iterator[Dict[str, Any]]:
        """Fetch orders incrementally"""
        headers = {"Authorization": f"Bearer {api_key}"}
        page = 1

        while True:
            response = requests.get(
                f"{base_url}/orders",
                headers=headers,
                params={
                    "created_after": created_at.last_value,
                    "page": page,
                    "per_page": 100,
                    "expand": "line_items",
                }
            )
            response.raise_for_status()

            data = response.json()
            orders = data.get("orders", [])

            if not orders:
                break

            yield orders

            if not data.get("has_more"):
                break

            page += 1

    @dlt.resource(
        name="products",
        write_disposition="replace",
    )
    def get_products() -> Iterator[Dict[str, Any]]:
        """Fetch all products"""
        headers = {"Authorization": f"Bearer {api_key}"}
        page = 1

        while True:
            response = requests.get(
                f"{base_url}/products",
                headers=headers,
                params={"page": page, "per_page": 100}
            )
            response.raise_for_status()

            data = response.json()
            products = data.get("products", [])

            if not products:
                break

            yield products

            if not data.get("has_more"):
                break

            page += 1

    return [get_customers(), get_orders(), get_products()]

# Configure in config.toml
# [sources.ecommerce_api]
# base_url = "https://api.example.com/v1"

# Configure in secrets.toml
# [sources.ecommerce_api]
# api_key = "your_api_key_here"

# Run the pipeline
if __name__ == "__main__":
    pipeline = dlt.pipeline(
        pipeline_name="ecommerce_pipeline",
        destination="duckdb",
        dataset_name="ecommerce_data"
    )

    load_info = pipeline.run(ecommerce_api())
    print(load_info)

Best Practices

Always use yield instead of returning large lists:
# ✅ Good: Memory efficient
@dlt.resource
def get_data():
    for page in range(100):
        data = fetch_page(page)
        yield data

# ❌ Bad: Loads everything into memory
@dlt.resource
def get_data():
    all_data = []
    for page in range(100):
        all_data.extend(fetch_page(page))
    return all_data
Use retry logic and handle errors gracefully:
from tenacity import retry, stop_after_attempt

@dlt.resource
@retry(stop=stop_after_attempt(3))
def resilient_resource():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    yield response.json()
Externalize configuration and secrets:
@dlt.source
def my_source(
    api_key: str = dlt.secrets.value,
    base_url: str = dlt.config.value,
    batch_size: int = dlt.config.value,
):
    # Source implementation
    pass
Use type hints for better IDE support and documentation:
from typing import Iterator, Dict, Any

@dlt.resource
def typed_resource() -> Iterator[Dict[str, Any]]:
    yield {"key": "value"}

Next Steps

Build docs developers (and LLMs) love