Sources Overview
A source in dlt is a logical grouping of resources that represent a single data origin, such as an API, database, or file system. Sources are Python functions decorated with @dlt.source that return one or more resources.
What is a Source?
Sources provide:
Resource Grouping : Organize related data endpoints together
Schema Management : Define and manage table structures, columns, and performance hints
Authentication : Centralize credentials and authentication logic
Reusability : Package and share data loading patterns across projects
A source is executed immediately when called, while resources delay execution until pipeline.run() or pipeline.extract() is invoked.
Creating a Simple Source
Define resources
Create functions that yield data, decorated with @dlt.resource: import dlt
from dlt.sources.helpers import requests
@dlt.resource ( name = "pokemon" , write_disposition = "replace" )
def get_pokemon ():
"""Fetch Pokemon data from PokeAPI"""
response = requests.get( "https://pokeapi.co/api/v2/pokemon?limit=100" )
yield response.json()[ "results" ]
@dlt.resource ( name = "berries" , write_disposition = "replace" )
def get_berries ():
"""Fetch berry data from PokeAPI"""
response = requests.get( "https://pokeapi.co/api/v2/berry?limit=100" )
yield response.json()[ "results" ]
Group into a source
Create a source function that returns multiple resources: @dlt.source
def pokeapi_source ():
"""Load Pokemon and berry data from PokeAPI"""
return [get_pokemon(), get_berries()]
Run the pipeline
Load data from the source to your destination: pipeline = dlt.pipeline(
pipeline_name = "pokeapi" ,
destination = "duckdb" ,
dataset_name = "pokemon_data"
)
load_info = pipeline.run(pokeapi_source())
print (load_info)
Working with Sources
Selecting Resources
You can select specific resources to load from a source:
source = pokeapi_source()
# Load only specific resources
pipeline.run(source.with_resources( "pokemon" ))
# Access resources as attributes
for pokemon in source.pokemon:
print (pokemon)
# Deselect resources
source.berries.selected = False
pipeline.run(source) # Only loads pokemon
Creating Resources Dynamically
Generate multiple resources from a list of endpoints:
@dlt.source
def hubspot ( api_key = dlt.secrets.value):
"""Load multiple Hubspot endpoints"""
base_url = "https://api.hubspot.com/crm/v3/objects"
endpoints = [ "companies" , "deals" , "contacts" ]
def get_resource ( endpoint ):
headers = { "Authorization" : f "Bearer { api_key } " }
response = requests.get( f " { base_url } / { endpoint } " , headers = headers)
yield response.json()[ "results" ]
for endpoint in endpoints:
yield dlt.resource(get_resource(endpoint), name = endpoint)
Adding Limits for Testing
Limit data extraction for testing and debugging:
Item Limit
Time Limit
Combined Limit
# Limit to 10 items per resource
pipeline.run(pokeapi_source().add_limit( 10 ))
add_limit limits the number of “yields” from a generator, not the number of rows. A single yield can produce multiple rows.
Built-in Sources
dlt provides several production-ready sources for common data origins:
REST API Load data from any REST API with automatic pagination and authentication
SQL Database Extract tables from any SQLAlchemy-supported database
Filesystem Read files from cloud storage (S3, GCS, Azure) or local filesystem
Custom Sources Build your own sources with Python generators and decorators
Source Configuration
Using Credentials
Sources can accept credentials from configuration:
@dlt.source
def my_api_source (
api_key : str = dlt.secrets.value,
base_url : str = dlt.config.value
):
"""Source with configuration from secrets and config"""
@dlt.resource
def fetch_data ():
headers = { "Authorization" : f "Bearer { api_key } " }
response = requests.get( f " { base_url } /data" , headers = headers)
yield response.json()
return fetch_data
Configure using environment variables or secrets.toml:
[ sources . my_api_source ]
api_key = "your_secret_key"
base_url = "https://api.example.com"
Renaming Sources
Create multiple instances of the same source:
from dlt.sources.sql_database import sql_database
# Create a renamed instance
my_postgres = sql_database.clone( name = "my_postgres" , section = "my_postgres" )(
table_names = [ "users" , "orders" ]
)
# Configure separately
other_db = sql_database.clone( name = "other_db" , section = "other_db" )(
table_names = [ "products" ]
)
Best Practices
Avoid long operations in source functions
Don’t extract data directly in the source function. Leave data extraction to resources. Source functions execute immediately when called, while resources execute during pipeline.run(), providing better error handling and metrics. # ❌ Bad: Extracting data in source function
@dlt.source
def bad_source ():
data = expensive_api_call() # Executed immediately!
return dlt.resource(data, name = "data" )
# ✅ Good: Extract in resource
@dlt.source
def good_source ():
@dlt.resource
def fetch_data ():
data = expensive_api_call() # Executed during pipeline.run()
yield data
return fetch_data
Give sources and resources clear, descriptive names that indicate their purpose and data origin. @dlt.source ( name = "stripe_payments" )
def stripe_source ():
@dlt.resource ( name = "invoices" , write_disposition = "merge" )
def get_invoices ():
# ...
@dlt.resource ( name = "customers" , write_disposition = "merge" )
def get_customers ():
# ...
return [get_invoices(), get_customers()]
Implement retry logic and error handling for robust data pipelines: from tenacity import retry, stop_after_attempt, wait_exponential
@dlt.resource
@retry ( stop = stop_after_attempt( 3 ), wait = wait_exponential( multiplier = 1 , min = 4 , max = 10 ))
def resilient_resource ():
response = requests.get( "https://api.example.com/data" )
response.raise_for_status()
yield response.json()
Next Steps
Explore specific source types and learn how to build custom sources: