Skip to main content
A source is a decorated function that returns one or more resources. Sources group related data extraction logic, share a schema, and provide a clean interface for passing credentials and configuration.

What is a Source?

A source in dlt:
  • Groups related resources that are extracted together
  • Defines a schema that describes all tables from its resources
  • Manages configuration and credentials for data extraction
  • Provides a reusable interface for data loading
  • Enables schema evolution across multiple resources
Think of a source as a collection of related API endpoints or database tables that share authentication and business context.

Creating a Source

Use the @dlt.source decorator:
import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.source
def github_source(api_token: str = dlt.secrets.value, repo: str = "dlt-hub/dlt"):
    """Loads GitHub repository data"""
    
    @dlt.resource(write_disposition="replace")
    def issues():
        url = f"https://api.github.com/repos/{repo}/issues"
        for page in paginate(url, headers={"Authorization": f"token {api_token}"}):
            yield page
    
    @dlt.resource(write_disposition="append")
    def comments():
        url = f"https://api.github.com/repos/{repo}/issues/comments"
        for page in paginate(url, headers={"Authorization": f"token {api_token}"}):
            yield page
    
    return [issues, comments]

Using a Source

Load All Resources

import dlt

pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="duckdb",
    dataset_name="github_data"
)

# Load all resources from the source
load_info = pipeline.run(github_source())
print(load_info)

Select Specific Resources

# Load only the issues resource
load_info = pipeline.run(github_source().with_resources("issues"))

# Load multiple selected resources
load_info = pipeline.run(github_source().with_resources("issues", "comments"))

Source Decorator Parameters

From /home/daytona/workspace/source/dlt/extract/decorators.py:363-420:
@dlt.source(
    name="my_source",              # Source name (defaults to function name)
    section="my_module",           # Config section for credentials
    max_table_nesting=2,           # Max depth for nested tables
    root_key=False,                # Propagate root key to nested tables
    schema=my_schema,              # Explicit schema instance
    schema_contract={              # Schema evolution rules
        "tables": "evolve",
        "columns": "evolve",
        "data_type": "freeze"
    },
    parallelized=False,            # Extract resources in parallel
    spec=MyConfigClass             # Custom config specification
)
def my_source():
    return [...]

Key Parameters

name (str): Source name, also used as schema name. Defaults to function name.
section (str): Configuration section for accessing credentials. Defaults to module name.
max_table_nesting (int): Maximum nesting depth before flattening to JSON/struct.
schema (Schema): Provide an explicit schema instead of auto-generating one.
schema_contract (dict): Control how schema evolves - “evolve”, “freeze”, or “discard_row”.

Configuration and Credentials

Sources automatically inject configuration and secrets:
import dlt

@dlt.source
def chess_source(
    username: str,                          # Required explicit argument
    chess_url: str = dlt.config.value,     # From config.toml
    api_secret: str = dlt.secrets.value,   # From secrets.toml
    title: str = "GM"                       # Optional with default
):
    @dlt.resource
    def player_profile():
        response = requests.get(
            f"{chess_url}/player/{username}",
            headers={"Authorization": f"Bearer {api_secret}"}
        )
        yield response.json()
    
    @dlt.resource
    def player_games():
        response = requests.get(
            f"{chess_url}/player/{username}/games",
            headers={"Authorization": f"Bearer {api_secret}"}
        )
        yield response.json()
    
    return [player_profile, player_games]

# Usage
load_info = pipeline.run(chess_source("magnuscarlsen"))

Configuration Layout

Configuration follows this structure:
# config.toml
[sources.chess_source]
chess_url = "https://api.chess.com/pub"

# secrets.toml  
[sources.chess_source]
api_secret = "your-secret-key"

Source Methods and Properties

Accessing Resources

source = github_source()

# Get specific resource
issues_resource = source.resources["issues"]

# Iterate over all resources
for resource in source.resources.values():
    print(resource.name)

# Get selected resources
selected = source.selected_resources

Schema Access

source = github_source()

# Access the source schema
schema = source.schema
print(schema.name)

# Get table definitions
for table_name in schema.tables:
    table = schema.get_table(table_name)
    print(f"Table: {table_name}")

Resource Selection

source = github_source()

# Select specific resources
source = source.with_resources("issues", "comments")

# Check which resources are selected
for name, resource in source.resources.items():
    print(f"{name}: selected={resource.selected}")

Advanced Patterns

Dynamic Resource Generation

import dlt

@dlt.source
def multi_table_source(table_names: list[str]):
    """Generate resources dynamically based on input"""
    resources = []
    
    for table_name in table_names:
        @dlt.resource(name=table_name)
        def load_table(table=table_name):
            # Load data for this specific table
            yield from get_table_data(table)
        
        resources.append(load_table)
    
    return resources

# Usage
source = multi_table_source(["users", "orders", "products"])
load_info = pipeline.run(source)

Source with Shared State

import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.source
def api_source(base_url: str = dlt.config.value):
    """All resources share the same source state"""
    
    @dlt.resource(write_disposition="append")
    def users():
        # Access source-level state
        state = dlt.current.source_state()
        last_updated = state.get("last_updated")
        
        for page in paginate(f"{base_url}/users"):
            yield page
        
        # Update source state
        state["last_updated"] = pendulum.now().isoformat()
    
    @dlt.resource(write_disposition="append")
    def events():
        # Both resources can access the same state
        state = dlt.current.source_state()
        last_updated = state.get("last_updated")
        
        for page in paginate(f"{base_url}/events"):
            yield page
    
    return [users, events]

Parallelized Sources

Extract resources in parallel for faster data loading:
import dlt

@dlt.source(parallelized=True)
def parallel_source():
    """All resources will be extracted in parallel"""
    
    @dlt.resource
    def resource_a():
        yield from range(1000)
    
    @dlt.resource
    def resource_b():
        yield from range(1000)
    
    return [resource_a, resource_b]
Parallel Execution: Only generator resources that yield items can be parallelized. Transformers and resources with dependencies run sequentially.

Type Signature

From /home/daytona/workspace/source/dlt/extract/decorators.py:363-375:
def source(
    func: Optional[AnyFun] = None,
    /,
    name: str = None,
    section: str = None,
    max_table_nesting: int = None,
    root_key: bool = None,
    schema: Schema = None,
    schema_contract: TSchemaContract = None,
    spec: Type[BaseConfiguration] = None,
    parallelized: bool = False,
    _impl_cls: Type[TDltSourceImpl] = DltSource,
) -> Any

Best Practices

1

Group related data

Put resources that share authentication, business domain, or schema in the same source
2

Use meaningful names

Name sources after the system they extract from: stripe_source, salesforce_source
3

Leverage configuration injection

Use dlt.config.value and dlt.secrets.value for clean credential management
4

Return resources explicitly

Always return a list of resources or individual resource objects from your source function
Schema Sharing: All resources in a source share the same schema. This enables dlt to maintain consistency and track relationships across tables.

Common Patterns

REST API Source

import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

@dlt.source
def rest_api_source(
    api_key: str = dlt.secrets.value,
    base_url: str = dlt.config.value
):
    auth = BearerTokenAuth(api_key)
    
    @dlt.resource(write_disposition="replace")
    def endpoint_a():
        for page in paginate(f"{base_url}/a", auth=auth):
            yield page
    
    @dlt.resource(write_disposition="append")
    def endpoint_b():
        for page in paginate(f"{base_url}/b", auth=auth):
            yield page
    
    return [endpoint_a, endpoint_b]

Database Source

import dlt
import sqlalchemy as sa

@dlt.source
def sql_source(connection_string: str = dlt.secrets.value):
    engine = sa.create_engine(connection_string)
    
    @dlt.resource
    def customers():
        with engine.connect() as conn:
            result = conn.execute(sa.text("SELECT * FROM customers"))
            yield from (dict(row._mapping) for row in result)
    
    @dlt.resource
    def orders():
        with engine.connect() as conn:
            result = conn.execute(sa.text("SELECT * FROM orders"))
            yield from (dict(row._mapping) for row in result)
    
    return [customers, orders]
  • Resource - Individual data streams within a source
  • Pipeline - Orchestrates source execution
  • Schema - Defines data structure for all source resources
  • State - Shared state across source resources

Build docs developers (and LLMs) love