A source is a decorated function that returns one or more resources. Sources group related data extraction logic, share a schema, and provide a clean interface for passing credentials and configuration.
What is a Source?
A source in dlt:
- Groups related resources that are extracted together
- Defines a schema that describes all tables from its resources
- Manages configuration and credentials for data extraction
- Provides a reusable interface for data loading
- Enables schema evolution across multiple resources
Think of a source as a collection of related API endpoints or database tables that share authentication and business context.
Creating a Source
Use the @dlt.source decorator:
import dlt
from dlt.sources.helpers.rest_client import paginate
@dlt.source
def github_source(api_token: str = dlt.secrets.value, repo: str = "dlt-hub/dlt"):
"""Loads GitHub repository data"""
@dlt.resource(write_disposition="replace")
def issues():
url = f"https://api.github.com/repos/{repo}/issues"
for page in paginate(url, headers={"Authorization": f"token {api_token}"}):
yield page
@dlt.resource(write_disposition="append")
def comments():
url = f"https://api.github.com/repos/{repo}/issues/comments"
for page in paginate(url, headers={"Authorization": f"token {api_token}"}):
yield page
return [issues, comments]
Using a Source
Load All Resources
import dlt
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_data"
)
# Load all resources from the source
load_info = pipeline.run(github_source())
print(load_info)
Select Specific Resources
# Load only the issues resource
load_info = pipeline.run(github_source().with_resources("issues"))
# Load multiple selected resources
load_info = pipeline.run(github_source().with_resources("issues", "comments"))
Source Decorator Parameters
From /home/daytona/workspace/source/dlt/extract/decorators.py:363-420:
@dlt.source(
name="my_source", # Source name (defaults to function name)
section="my_module", # Config section for credentials
max_table_nesting=2, # Max depth for nested tables
root_key=False, # Propagate root key to nested tables
schema=my_schema, # Explicit schema instance
schema_contract={ # Schema evolution rules
"tables": "evolve",
"columns": "evolve",
"data_type": "freeze"
},
parallelized=False, # Extract resources in parallel
spec=MyConfigClass # Custom config specification
)
def my_source():
return [...]
Key Parameters
name (str): Source name, also used as schema name. Defaults to function name.
section (str): Configuration section for accessing credentials. Defaults to module name.
max_table_nesting (int): Maximum nesting depth before flattening to JSON/struct.
schema (Schema): Provide an explicit schema instead of auto-generating one.
schema_contract (dict): Control how schema evolves - “evolve”, “freeze”, or “discard_row”.
Configuration and Credentials
Sources automatically inject configuration and secrets:
import dlt
@dlt.source
def chess_source(
username: str, # Required explicit argument
chess_url: str = dlt.config.value, # From config.toml
api_secret: str = dlt.secrets.value, # From secrets.toml
title: str = "GM" # Optional with default
):
@dlt.resource
def player_profile():
response = requests.get(
f"{chess_url}/player/{username}",
headers={"Authorization": f"Bearer {api_secret}"}
)
yield response.json()
@dlt.resource
def player_games():
response = requests.get(
f"{chess_url}/player/{username}/games",
headers={"Authorization": f"Bearer {api_secret}"}
)
yield response.json()
return [player_profile, player_games]
# Usage
load_info = pipeline.run(chess_source("magnuscarlsen"))
Configuration Layout
Configuration follows this structure:
# config.toml
[sources.chess_source]
chess_url = "https://api.chess.com/pub"
# secrets.toml
[sources.chess_source]
api_secret = "your-secret-key"
Source Methods and Properties
Accessing Resources
source = github_source()
# Get specific resource
issues_resource = source.resources["issues"]
# Iterate over all resources
for resource in source.resources.values():
print(resource.name)
# Get selected resources
selected = source.selected_resources
Schema Access
source = github_source()
# Access the source schema
schema = source.schema
print(schema.name)
# Get table definitions
for table_name in schema.tables:
table = schema.get_table(table_name)
print(f"Table: {table_name}")
Resource Selection
source = github_source()
# Select specific resources
source = source.with_resources("issues", "comments")
# Check which resources are selected
for name, resource in source.resources.items():
print(f"{name}: selected={resource.selected}")
Advanced Patterns
Dynamic Resource Generation
import dlt
@dlt.source
def multi_table_source(table_names: list[str]):
"""Generate resources dynamically based on input"""
resources = []
for table_name in table_names:
@dlt.resource(name=table_name)
def load_table(table=table_name):
# Load data for this specific table
yield from get_table_data(table)
resources.append(load_table)
return resources
# Usage
source = multi_table_source(["users", "orders", "products"])
load_info = pipeline.run(source)
Source with Shared State
import dlt
from dlt.sources.helpers.rest_client import paginate
@dlt.source
def api_source(base_url: str = dlt.config.value):
"""All resources share the same source state"""
@dlt.resource(write_disposition="append")
def users():
# Access source-level state
state = dlt.current.source_state()
last_updated = state.get("last_updated")
for page in paginate(f"{base_url}/users"):
yield page
# Update source state
state["last_updated"] = pendulum.now().isoformat()
@dlt.resource(write_disposition="append")
def events():
# Both resources can access the same state
state = dlt.current.source_state()
last_updated = state.get("last_updated")
for page in paginate(f"{base_url}/events"):
yield page
return [users, events]
Parallelized Sources
Extract resources in parallel for faster data loading:
import dlt
@dlt.source(parallelized=True)
def parallel_source():
"""All resources will be extracted in parallel"""
@dlt.resource
def resource_a():
yield from range(1000)
@dlt.resource
def resource_b():
yield from range(1000)
return [resource_a, resource_b]
Parallel Execution: Only generator resources that yield items can be parallelized. Transformers and resources with dependencies run sequentially.
Type Signature
From /home/daytona/workspace/source/dlt/extract/decorators.py:363-375:
def source(
func: Optional[AnyFun] = None,
/,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = None,
schema: Schema = None,
schema_contract: TSchemaContract = None,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
_impl_cls: Type[TDltSourceImpl] = DltSource,
) -> Any
Best Practices
Group related data
Put resources that share authentication, business domain, or schema in the same source
Use meaningful names
Name sources after the system they extract from: stripe_source, salesforce_source
Leverage configuration injection
Use dlt.config.value and dlt.secrets.value for clean credential management
Return resources explicitly
Always return a list of resources or individual resource objects from your source function
Schema Sharing: All resources in a source share the same schema. This enables dlt to maintain consistency and track relationships across tables.
Common Patterns
REST API Source
import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
@dlt.source
def rest_api_source(
api_key: str = dlt.secrets.value,
base_url: str = dlt.config.value
):
auth = BearerTokenAuth(api_key)
@dlt.resource(write_disposition="replace")
def endpoint_a():
for page in paginate(f"{base_url}/a", auth=auth):
yield page
@dlt.resource(write_disposition="append")
def endpoint_b():
for page in paginate(f"{base_url}/b", auth=auth):
yield page
return [endpoint_a, endpoint_b]
Database Source
import dlt
import sqlalchemy as sa
@dlt.source
def sql_source(connection_string: str = dlt.secrets.value):
engine = sa.create_engine(connection_string)
@dlt.resource
def customers():
with engine.connect() as conn:
result = conn.execute(sa.text("SELECT * FROM customers"))
yield from (dict(row._mapping) for row in result)
@dlt.resource
def orders():
with engine.connect() as conn:
result = conn.execute(sa.text("SELECT * FROM orders"))
yield from (dict(row._mapping) for row in result)
return [customers, orders]
- Resource - Individual data streams within a source
- Pipeline - Orchestrates source execution
- Schema - Defines data structure for all source resources
- State - Shared state across source resources