The DltSource class groups multiple resources under a single schema and provides methods to manage and configure them. Sources are created using the @dlt.source decorator.
Creating a Source
Sources are created by decorating a function with @dlt.source:
import dlt
@dlt.source
def my_source():
return [users_resource(), orders_resource()]
Properties
name
The name of the source, derived from the schema name.
Returns: str - The source name
Source: ~/workspace/source/dlt/extract/source.py:390
schema
The schema associated with the source.
Returns: Schema - The schema object
Source: ~/workspace/source/dlt/extract/source.py:485
resources
A dictionary of all resources in the source.
for name, resource in source.resources.items():
print(f"Resource: {name}")
Returns: DltResourceDict - Dictionary of resources keyed by name
Source: ~/workspace/source/dlt/extract/source.py:467
selected_resources
A dictionary of only the selected resources that will be loaded.
selected = source.selected_resources
Returns: Dict[str, DltResource] - Dictionary of selected resources
Source: ~/workspace/source/dlt/extract/source.py:476
exhausted
Checks if the source has been iterated and cannot be used again.
if source.exhausted:
print("Source already consumed")
Returns: bool - True if source is exhausted
Source: ~/workspace/source/dlt/extract/source.py:457
max_table_nesting
Schema hint that sets the maximum depth of nested tables.
source.max_table_nesting = 2
Returns: int - Maximum nesting level
Source: ~/workspace/source/dlt/extract/source.py:395
root_key
Enables merging on all resources by propagating root foreign key to nested tables.
Returns: Optional[bool] - Root key propagation setting
Source: ~/workspace/source/dlt/extract/source.py:409
schema_contract
Schema contract settings for the source.
source.schema_contract = {"tables": "freeze", "columns": "evolve"}
Returns: TSchemaContract - Schema contract settings
Source: ~/workspace/source/dlt/extract/source.py:449
state
Gets source-scoped state from the active pipeline.
state = source.state
state["last_updated"] = "2024-01-01"
Returns: StrAny - Dictionary containing source state
Source: ~/workspace/source/dlt/extract/source.py:584
Methods
with_resources()
Selects specific resources to be loaded. Returns a clone of the source with only selected resources.
# Load only users and orders
filtered_source = source.with_resources("users", "orders")
Names of resources to select.
Returns: DltSource - Cloned source with selected resources
Source: ~/workspace/source/dlt/extract/source.py:509
discover_schema()
Computes table schemas for all selected resources and merges them with the current schema.
schema = source.discover_schema()
Data item for evaluating dynamic tables. If not provided, dynamic tables are ignored.
Returns: Schema - The discovered schema
Source: ~/workspace/source/dlt/extract/source.py:492
add_limit()
Limits the items processed in all selected resources by count or time.
# Limit to 100 items per resource
source.add_limit(max_items=100)
# Limit to 60 seconds per resource
source.add_limit(max_time=60)
Maximum number of items to yield per resource.
Maximum seconds for each resource to run.
Count rows instead of pages. Last page will not be trimmed.
Returns: DltSource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/source.py:532
parallelize()
Marks all resources to run in parallel.
Returns: DltSource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/source.py:563
decompose()
Decomposes source into a list of sources based on strategy.
# Decompose into strongly connected components
decomposed = source.decompose("scc")
strategy
TDecompositionStrategy
required
Strategy for decomposition: “none” or “scc” (strongly connected components).
Returns: List[DltSource] - List of decomposed sources
Source: ~/workspace/source/dlt/extract/source.py:515
clone()
Creates a deep copy of the source.
cloned = source.clone(with_name="new_source_name")
New name for the cloned source. Also renames the schema.
Returns: DltSource - Cloned source
Source: ~/workspace/source/dlt/extract/source.py:589
run
Convenience property that calls run() on the currently active pipeline.
Returns: SupportsPipelineRun - Partial function that runs the source
Source: ~/workspace/source/dlt/extract/source.py:576
Example Usage
Basic Source
import dlt
from dlt.sources.helpers import requests
@dlt.source
def github_source(repo_name: str):
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
url = f"https://api.github.com/repos/{repo_name}/issues"
response = requests.get(url)
yield response.json()
@dlt.resource
def pull_requests():
url = f"https://api.github.com/repos/{repo_name}/pulls"
response = requests.get(url)
yield response.json()
return issues, pull_requests
# Use the source
source = github_source("dlt-hub/dlt")
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data"
)
pipeline.run(source)
Selective Loading
import dlt
@dlt.source
def my_source():
return [users(), orders(), products()]
# Load only users and orders
source = my_source().with_resources("users", "orders")
pipeline = dlt.pipeline(
pipeline_name="selective",
destination="postgres"
)
pipeline.run(source)
Source with Schema Configuration
import dlt
@dlt.source
def api_source():
return [data_resource()]
source = api_source()
# Configure schema behavior
source.max_table_nesting = 1
source.root_key = True
source.schema_contract = {
"tables": "evolve",
"columns": "freeze",
"data_type": "evolve"
}
pipeline = dlt.pipeline(
pipeline_name="api",
destination="bigquery"
)
pipeline.run(source)
Source with Limits
import dlt
@dlt.source
def large_dataset():
return [
table1_resource(),
table2_resource(),
table3_resource()
]
# Sample first 1000 items from each resource
source = large_dataset().add_limit(max_items=1000)
pipeline = dlt.pipeline(
pipeline_name="sample",
destination="duckdb"
)
pipeline.run(source)
Using Source State
import dlt
from datetime import datetime
@dlt.source
def incremental_source():
@dlt.resource
def data():
# Access source state
source_state = dlt.current.source().state
last_run = source_state.get("last_run")
# Fetch data since last run
items = fetch_data(since=last_run)
yield items
# Update state
source_state["last_run"] = datetime.now().isoformat()
return data
source = incremental_source()
pipeline = dlt.pipeline(
pipeline_name="incremental",
destination="snowflake"
)
pipeline.run(source)
Accessing Resources
Resources can be accessed as attributes:
source = my_source()
# Access resource by name
users_resource = source.users
orders_resource = source.orders
# Iterate through all resources
for name, resource in source.resources.items():
print(f"Resource: {name}, Selected: {resource.selected}")