Skip to main content
The DltSource class groups multiple resources under a single schema and provides methods to manage and configure them. Sources are created using the @dlt.source decorator.

Creating a Source

Sources are created by decorating a function with @dlt.source:
import dlt

@dlt.source
def my_source():
    return [users_resource(), orders_resource()]

Properties

name

The name of the source, derived from the schema name.
name = source.name
Returns: str - The source name Source: ~/workspace/source/dlt/extract/source.py:390

schema

The schema associated with the source.
schema = source.schema
Returns: Schema - The schema object Source: ~/workspace/source/dlt/extract/source.py:485

resources

A dictionary of all resources in the source.
for name, resource in source.resources.items():
    print(f"Resource: {name}")
Returns: DltResourceDict - Dictionary of resources keyed by name Source: ~/workspace/source/dlt/extract/source.py:467

selected_resources

A dictionary of only the selected resources that will be loaded.
selected = source.selected_resources
Returns: Dict[str, DltResource] - Dictionary of selected resources Source: ~/workspace/source/dlt/extract/source.py:476

exhausted

Checks if the source has been iterated and cannot be used again.
if source.exhausted:
    print("Source already consumed")
Returns: bool - True if source is exhausted Source: ~/workspace/source/dlt/extract/source.py:457

max_table_nesting

Schema hint that sets the maximum depth of nested tables.
source.max_table_nesting = 2
Returns: int - Maximum nesting level Source: ~/workspace/source/dlt/extract/source.py:395

root_key

Enables merging on all resources by propagating root foreign key to nested tables.
source.root_key = True
Returns: Optional[bool] - Root key propagation setting Source: ~/workspace/source/dlt/extract/source.py:409

schema_contract

Schema contract settings for the source.
source.schema_contract = {"tables": "freeze", "columns": "evolve"}
Returns: TSchemaContract - Schema contract settings Source: ~/workspace/source/dlt/extract/source.py:449

state

Gets source-scoped state from the active pipeline.
state = source.state
state["last_updated"] = "2024-01-01"
Returns: StrAny - Dictionary containing source state Source: ~/workspace/source/dlt/extract/source.py:584

Methods

with_resources()

Selects specific resources to be loaded. Returns a clone of the source with only selected resources.
# Load only users and orders
filtered_source = source.with_resources("users", "orders")
*resource_names
str
required
Names of resources to select.
Returns: DltSource - Cloned source with selected resources Source: ~/workspace/source/dlt/extract/source.py:509

discover_schema()

Computes table schemas for all selected resources and merges them with the current schema.
schema = source.discover_schema()
item
TDataItem
default:"None"
Data item for evaluating dynamic tables. If not provided, dynamic tables are ignored.
meta
Any
default:"None"
Additional metadata.
Returns: Schema - The discovered schema Source: ~/workspace/source/dlt/extract/source.py:492

add_limit()

Limits the items processed in all selected resources by count or time.
# Limit to 100 items per resource
source.add_limit(max_items=100)

# Limit to 60 seconds per resource
source.add_limit(max_time=60)
max_items
int
default:"None"
Maximum number of items to yield per resource.
max_time
float
default:"None"
Maximum seconds for each resource to run.
count_rows
bool
default:"False"
Count rows instead of pages. Last page will not be trimmed.
Returns: DltSource - Returns self for chaining Source: ~/workspace/source/dlt/extract/source.py:532

parallelize()

Marks all resources to run in parallel.
source.parallelize()
Returns: DltSource - Returns self for chaining Source: ~/workspace/source/dlt/extract/source.py:563

decompose()

Decomposes source into a list of sources based on strategy.
# Decompose into strongly connected components
decomposed = source.decompose("scc")
strategy
TDecompositionStrategy
required
Strategy for decomposition: “none” or “scc” (strongly connected components).
Returns: List[DltSource] - List of decomposed sources Source: ~/workspace/source/dlt/extract/source.py:515

clone()

Creates a deep copy of the source.
cloned = source.clone(with_name="new_source_name")
with_name
str
default:"None"
New name for the cloned source. Also renames the schema.
Returns: DltSource - Cloned source Source: ~/workspace/source/dlt/extract/source.py:589

run

Convenience property that calls run() on the currently active pipeline.
info = source.run
Returns: SupportsPipelineRun - Partial function that runs the source Source: ~/workspace/source/dlt/extract/source.py:576

Example Usage

Basic Source

import dlt
from dlt.sources.helpers import requests

@dlt.source
def github_source(repo_name: str):
    @dlt.resource(write_disposition="merge", primary_key="id")
    def issues():
        url = f"https://api.github.com/repos/{repo_name}/issues"
        response = requests.get(url)
        yield response.json()
    
    @dlt.resource
    def pull_requests():
        url = f"https://api.github.com/repos/{repo_name}/pulls"
        response = requests.get(url)
        yield response.json()
    
    return issues, pull_requests

# Use the source
source = github_source("dlt-hub/dlt")
pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
    dataset_name="github_data"
)
pipeline.run(source)

Selective Loading

import dlt

@dlt.source
def my_source():
    return [users(), orders(), products()]

# Load only users and orders
source = my_source().with_resources("users", "orders")

pipeline = dlt.pipeline(
    pipeline_name="selective",
    destination="postgres"
)
pipeline.run(source)

Source with Schema Configuration

import dlt

@dlt.source
def api_source():
    return [data_resource()]

source = api_source()

# Configure schema behavior
source.max_table_nesting = 1
source.root_key = True
source.schema_contract = {
    "tables": "evolve",
    "columns": "freeze",
    "data_type": "evolve"
}

pipeline = dlt.pipeline(
    pipeline_name="api",
    destination="bigquery"
)
pipeline.run(source)

Source with Limits

import dlt

@dlt.source
def large_dataset():
    return [
        table1_resource(),
        table2_resource(),
        table3_resource()
    ]

# Sample first 1000 items from each resource
source = large_dataset().add_limit(max_items=1000)

pipeline = dlt.pipeline(
    pipeline_name="sample",
    destination="duckdb"
)
pipeline.run(source)

Using Source State

import dlt
from datetime import datetime

@dlt.source
def incremental_source():
    @dlt.resource
    def data():
        # Access source state
        source_state = dlt.current.source().state
        last_run = source_state.get("last_run")
        
        # Fetch data since last run
        items = fetch_data(since=last_run)
        yield items
        
        # Update state
        source_state["last_run"] = datetime.now().isoformat()
    
    return data

source = incremental_source()
pipeline = dlt.pipeline(
    pipeline_name="incremental",
    destination="snowflake"
)
pipeline.run(source)

Accessing Resources

Resources can be accessed as attributes:
source = my_source()

# Access resource by name
users_resource = source.users
orders_resource = source.orders

# Iterate through all resources
for name, resource in source.resources.items():
    print(f"Resource: {name}, Selected: {resource.selected}")

Build docs developers (and LLMs) love