Skip to main content

Sources Overview

A source in dlt is a logical grouping of resources that represent a single data origin, such as an API, database, or file system. Sources are Python functions decorated with @dlt.source that return one or more resources.

What is a Source?

Sources provide:
  • Resource Grouping: Organize related data endpoints together
  • Schema Management: Define and manage table structures, columns, and performance hints
  • Authentication: Centralize credentials and authentication logic
  • Reusability: Package and share data loading patterns across projects
A source is executed immediately when called, while resources delay execution until pipeline.run() or pipeline.extract() is invoked.

Creating a Simple Source

1

Define resources

Create functions that yield data, decorated with @dlt.resource:
import dlt
from dlt.sources.helpers import requests

@dlt.resource(name="pokemon", write_disposition="replace")
def get_pokemon():
    """Fetch Pokemon data from PokeAPI"""
    response = requests.get("https://pokeapi.co/api/v2/pokemon?limit=100")
    yield response.json()["results"]

@dlt.resource(name="berries", write_disposition="replace")
def get_berries():
    """Fetch berry data from PokeAPI"""
    response = requests.get("https://pokeapi.co/api/v2/berry?limit=100")
    yield response.json()["results"]
2

Group into a source

Create a source function that returns multiple resources:
@dlt.source
def pokeapi_source():
    """Load Pokemon and berry data from PokeAPI"""
    return [get_pokemon(), get_berries()]
3

Run the pipeline

Load data from the source to your destination:
pipeline = dlt.pipeline(
    pipeline_name="pokeapi",
    destination="duckdb",
    dataset_name="pokemon_data"
)

load_info = pipeline.run(pokeapi_source())
print(load_info)

Working with Sources

Selecting Resources

You can select specific resources to load from a source:
source = pokeapi_source()

# Load only specific resources
pipeline.run(source.with_resources("pokemon"))

# Access resources as attributes
for pokemon in source.pokemon:
    print(pokemon)

# Deselect resources
source.berries.selected = False
pipeline.run(source)  # Only loads pokemon

Creating Resources Dynamically

Generate multiple resources from a list of endpoints:
@dlt.source
def hubspot(api_key=dlt.secrets.value):
    """Load multiple Hubspot endpoints"""
    base_url = "https://api.hubspot.com/crm/v3/objects"
    endpoints = ["companies", "deals", "contacts"]

    def get_resource(endpoint):
        headers = {"Authorization": f"Bearer {api_key}"}
        response = requests.get(f"{base_url}/{endpoint}", headers=headers)
        yield response.json()["results"]

    for endpoint in endpoints:
        yield dlt.resource(get_resource(endpoint), name=endpoint)

Adding Limits for Testing

Limit data extraction for testing and debugging:
# Limit to 10 items per resource
pipeline.run(pokeapi_source().add_limit(10))
add_limit limits the number of “yields” from a generator, not the number of rows. A single yield can produce multiple rows.

Built-in Sources

dlt provides several production-ready sources for common data origins:

REST API

Load data from any REST API with automatic pagination and authentication

SQL Database

Extract tables from any SQLAlchemy-supported database

Filesystem

Read files from cloud storage (S3, GCS, Azure) or local filesystem

Custom Sources

Build your own sources with Python generators and decorators

Source Configuration

Using Credentials

Sources can accept credentials from configuration:
@dlt.source
def my_api_source(
    api_key: str = dlt.secrets.value,
    base_url: str = dlt.config.value
):
    """Source with configuration from secrets and config"""
    @dlt.resource
    def fetch_data():
        headers = {"Authorization": f"Bearer {api_key}"}
        response = requests.get(f"{base_url}/data", headers=headers)
        yield response.json()

    return fetch_data
Configure using environment variables or secrets.toml:
[sources.my_api_source]
api_key = "your_secret_key"
base_url = "https://api.example.com"

Renaming Sources

Create multiple instances of the same source:
from dlt.sources.sql_database import sql_database

# Create a renamed instance
my_postgres = sql_database.clone(name="my_postgres", section="my_postgres")(
    table_names=["users", "orders"]
)

# Configure separately
other_db = sql_database.clone(name="other_db", section="other_db")(
    table_names=["products"]
)

Best Practices

Don’t extract data directly in the source function. Leave data extraction to resources. Source functions execute immediately when called, while resources execute during pipeline.run(), providing better error handling and metrics.
# ❌ Bad: Extracting data in source function
@dlt.source
def bad_source():
    data = expensive_api_call()  # Executed immediately!
    return dlt.resource(data, name="data")

# ✅ Good: Extract in resource
@dlt.source
def good_source():
    @dlt.resource
    def fetch_data():
        data = expensive_api_call()  # Executed during pipeline.run()
        yield data
    return fetch_data
Give sources and resources clear, descriptive names that indicate their purpose and data origin.
@dlt.source(name="stripe_payments")
def stripe_source():
    @dlt.resource(name="invoices", write_disposition="merge")
    def get_invoices():
        # ...

    @dlt.resource(name="customers", write_disposition="merge")
    def get_customers():
        # ...

    return [get_invoices(), get_customers()]
Implement retry logic and error handling for robust data pipelines:
from tenacity import retry, stop_after_attempt, wait_exponential

@dlt.resource
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def resilient_resource():
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    yield response.json()

Next Steps

Explore specific source types and learn how to build custom sources:

Build docs developers (and LLMs) love