Skip to main content

REST API Source

The rest_api source provides a declarative way to load data from REST APIs. It handles common patterns like pagination, authentication, and incremental loading automatically.

Quick Start

Load data from a REST API with minimal configuration:
import dlt
from dlt.sources.rest_api import rest_api_source

# Create a source for the Pokemon API
pokemon_source = rest_api_source({
    "client": {
        "base_url": "https://pokeapi.co/api/v2/",
    },
    "resources": [
        {
            "name": "pokemon",
            "endpoint": {
                "path": "pokemon",
                "params": {
                    "limit": 100,
                },
            },
        },
    ],
})

# Load the data
pipeline = dlt.pipeline(
    pipeline_name="pokemon_pipeline",
    destination="duckdb",
    dataset_name="pokemon_data"
)

load_info = pipeline.run(pokemon_source)
print(load_info)

Configuration Structure

The REST API source uses a configuration dictionary with three main sections:
1

Client Configuration

Define base URL, authentication, and default pagination:
"client": {
    "base_url": "https://api.example.com/",
    "auth": {
        "type": "bearer",
        "token": dlt.secrets["api_token"],
    },
    "paginator": "json_link",
}
2

Resource Defaults

Set common configurations for all resources:
"resource_defaults": {
    "primary_key": "id",
    "write_disposition": "merge",
    "endpoint": {
        "params": {
            "per_page": 100,
        },
    },
}
3

Resources

Define individual API endpoints to load:
"resources": [
    {
        "name": "users",
        "endpoint": {
            "path": "users",
        },
    },
]

Authentication

The REST API source supports multiple authentication methods:
"auth": {
    "type": "bearer",
    "token": dlt.secrets["github_token"],
}

Pagination

Automatic pagination is configured at the client or endpoint level:
# Follows 'next' links in JSON responses
"paginator": "json_link"

# Custom JSON path
"paginator": {
    "type": "json_link",
    "next_url_path": "pagination.next",
}

Incremental Loading

Load only new or updated records using incremental cursors:
github_source = rest_api_source({
    "client": {
        "base_url": "https://api.github.com/repos/dlt-hub/dlt/",
        "auth": {
            "token": dlt.secrets["github_token"],
        },
    },
    "resources": [
        {
            "name": "issues",
            "endpoint": {
                "path": "issues",
                "params": {
                    "state": "all",
                    "sort": "updated",
                    "direction": "desc",
                    "since": {
                        "type": "incremental",
                        "cursor_path": "updated_at",
                        "initial_value": "2024-01-01T00:00:00Z",
                    },
                },
            },
        },
    ],
})
The cursor_path uses JSONPath notation to extract the cursor value from each record. On subsequent runs, only records with a cursor value greater than the last seen value are loaded.

Dependent Resources

Create resources that depend on data from other resources:
{
    "resources": [
        {
            "name": "users",
            "endpoint": {
                "path": "users",
            },
        },
        {
            "name": "user_posts",
            "endpoint": {
                # Use data from 'users' resource
                "path": "users/{users.id}/posts",
            },
        },
    ],
}
The {users.id} placeholder is resolved for each user, creating a request per user.

Complete Example: GitHub API

Here’s a comprehensive example loading GitHub repository data:
import dlt
from dlt.sources.rest_api import rest_api_source

# Configure the source
github_source = rest_api_source({
    "client": {
        "base_url": "https://api.github.com/repos/dlt-hub/dlt/",
        "auth": {
            "token": dlt.secrets["github_token"],
        },
    },
    "resource_defaults": {
        "primary_key": "id",
        "write_disposition": "merge",
        "endpoint": {
            "params": {
                "per_page": 100,
            },
        },
    },
    "resources": [
        {
            "name": "issues",
            "endpoint": {
                "path": "issues",
                "params": {
                    "state": "all",
                    "sort": "updated",
                    "direction": "desc",
                    "since": {
                        "type": "incremental",
                        "cursor_path": "updated_at",
                        "initial_value": "2024-01-01T00:00:00Z",
                    },
                },
            },
        },
        {
            "name": "issue_comments",
            "endpoint": {
                "path": "issues/{issues.number}/comments",
                "params": {
                    "per_page": 100,
                },
            },
            "include_from_parent": ["id"],  # Include issue ID in comments
        },
        {
            "name": "pulls",
            "endpoint": {
                "path": "pulls",
                "params": {
                    "state": "all",
                },
            },
        },
    ],
})

# Create and run pipeline
pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="duckdb",
    dataset_name="github_data"
)

load_info = pipeline.run(github_source)
print(load_info)

Advanced Features

Data Selector

Extract data from nested JSON responses:
"endpoint": {
    "path": "search/repositories",
    "params": {"q": "dlt"},
    "data_selector": "items",  # Extract from response["items"]
}

Custom Headers

Add custom headers to requests:
"endpoint": {
    "path": "data",
    "headers": {
        "X-Custom-Header": "value",
        "Accept": "application/vnd.github.v3+json",
    },
}

POST Requests

Make POST requests with JSON body:
"endpoint": {
    "path": "search",
    "method": "POST",
    "json": {
        "query": "dlt",
        "filters": {"language": "python"},
    },
}

Processing Steps

Transform data before loading:
{
    "name": "users",
    "endpoint": {"path": "users"},
    "processing_steps": [
        {
            "filter": lambda item: item["active"] is True,
        },
        {
            "map": lambda item: {**item, "processed": True},
        },
    ],
}

Using the REST Client Directly

For more control, use the RESTClient helper:
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

@dlt.resource
def fetch_api_data():
    client = RESTClient(
        base_url="https://api.example.com",
        auth=BearerTokenAuth(token=dlt.secrets["api_token"]),
    )

    for page in client.paginate(
        "/data",
        paginator=JSONResponsePaginator(next_url_path="next_page"),
    ):
        yield page

Best Practices

Use Incremental Loading

Always configure incremental loading for large datasets to avoid reprocessing all data on each run.

Set Appropriate Limits

Configure per_page or limit parameters to balance between API rate limits and performance.

Handle Rate Limits

Implement retry logic and respect API rate limits to ensure reliable data extraction.

Secure Credentials

Always use dlt.secrets for API keys and tokens. Never hardcode credentials in your source code.

Common Issues

Ensure your token or API key is correctly set in secrets.toml:
[sources.my_api]
api_token = "your_secret_token_here"
Reference it in your source:
"auth": {
    "token": dlt.secrets["sources.my_api.api_token"],
}
Check that the paginator type matches your API’s pagination style. Inspect API responses to identify the correct pagination pattern:
# Debug: Print response to see pagination structure
response = requests.get("https://api.example.com/data")
print(response.json())
Use data_selector to extract data from nested JSON:
"endpoint": {
    "path": "data",
    "data_selector": "results.items",  # Navigate nested structure
}

Next Steps

Build docs developers (and LLMs) love