A resource is a decorated function that yields data items. Resources represent individual data streams like API endpoints, database tables, or file collections. Each resource typically maps to a single table in your destination.
What is a Resource?
A resource in dlt:
- Yields data items from a source like an API, database, or file
- Defines table schema including name, columns, and data types
- Controls write behavior with write dispositions (append, replace, merge)
- Manages its own state for incremental loading
- Can be composed into transformers and pipelines
Think of a resource as a single data stream that knows how to fetch data and describe its structure.
Creating a Resource
Use the @dlt.resource decorator:
import dlt
from dlt.sources.helpers.rest_client import paginate
@dlt.resource(write_disposition="append")
def github_issues(api_token: str = dlt.secrets.value):
"""Fetch all issues from a GitHub repository"""
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
headers = {"Authorization": f"token {api_token}"}
for page in paginate(url, headers=headers):
yield page
As a Standalone Function
import dlt
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_data"
)
load_info = pipeline.run(github_issues())
print(load_info)
Inside a Source
import dlt
@dlt.source
def github_source(repo: str = "dlt-hub/dlt"):
@dlt.resource(write_disposition="replace")
def issues():
url = f"https://api.github.com/repos/{repo}/issues"
for page in paginate(url):
yield page
@dlt.resource(write_disposition="append")
def pull_requests():
url = f"https://api.github.com/repos/{repo}/pulls"
for page in paginate(url):
yield page
return [issues, pull_requests]
Resource Decorator Parameters
From /home/daytona/workspace/source/dlt/extract/decorators.py:540-563:
@dlt.resource(
name="my_resource", # Resource name (defaults to function name)
table_name="my_table", # Table name if different from resource name
write_disposition="append", # How to write: append, replace, merge, skip
columns={...}, # Column definitions and hints
primary_key="id", # Primary key column(s)
merge_key="updated_at", # Merge key for deduplication
max_table_nesting=2, # Max nesting depth
schema_contract={ # Schema evolution control
"tables": "evolve",
"columns": "freeze",
"data_type": "evolve"
},
selected=True, # Include in pipeline extraction
parallelized=False, # Extract in parallel
incremental=dlt.sources.incremental("updated_at") # Incremental loading
)
def my_resource():
yield {...}
Essential Parameters
name (str): Resource and default table name. Defaults to function name.
write_disposition (str): Controls data writing behavior:
"append" - Add new data to existing table (default)
"replace" - Replace all existing data
"merge" - Deduplicate and merge based on primary_key/merge_key
"skip" - Skip loading entirely
primary_key (str | list[str]): Column(s) that uniquely identify records. Used with merge disposition.
merge_key (str | list[str]): Column(s) for deduplication. Removes overlapping data ranges.
Write Dispositions
Append
Always add new data:
@dlt.resource(write_disposition="append")
def events():
"""Each run adds new events to the table"""
yield {"event": "user_login", "timestamp": pendulum.now()}
yield {"event": "page_view", "timestamp": pendulum.now()}
Replace
Replace all existing data:
@dlt.resource(write_disposition="replace")
def current_users():
"""Latest snapshot of users - replaces previous data"""
response = requests.get("https://api.example.com/users")
yield response.json()
Merge
Deduplicate and merge data:
@dlt.resource(
write_disposition="merge",
primary_key="id",
merge_key="updated_at"
)
def user_profiles():
"""Merge based on ID, remove old updates"""
for user in get_updated_users():
yield user
SCD2 (Slowly Changing Dimension Type 2)
@dlt.resource(
write_disposition={
"disposition": "merge",
"strategy": "scd2"
},
primary_key="id"
)
def users_history():
"""Track historical changes to user records"""
for user in get_all_users():
yield user
Incremental Loading
Load only new or changed data:
import dlt
from dlt.sources.helpers import requests
@dlt.resource(
write_disposition="append",
primary_key="id"
)
def issues(updated_since=dlt.sources.incremental("updated_at")):
"""Load only issues updated since last run"""
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
params = {}
# Use the last value from previous run
if updated_since.last_value:
params["since"] = updated_since.last_value.isoformat()
response = requests.get(url, params=params)
yield response.json()
Incremental with Resource State
import dlt
@dlt.resource(write_disposition="append")
def players_games(players: list[str]):
"""Track which archives have been processed"""
# Get resource-specific state
state = dlt.current.resource_state()
checked_archives = state.setdefault("archives", [])
for player in players:
archives = get_player_archives(player)
for url in archives:
if url in checked_archives:
print(f"Skipping {url}")
continue
print(f"Processing {url}")
checked_archives.append(url)
# Fetch and yield games
response = requests.get(url)
yield response.json().get("games", [])
Schema Hints
Define column types and properties:
import dlt
from dlt.common.schema.typing import TColumnSchema
@dlt.resource(
write_disposition="append",
columns=[
{"name": "id", "data_type": "bigint", "nullable": False},
{"name": "email", "data_type": "text", "unique": True},
{"name": "created_at", "data_type": "timestamp"},
{"name": "metadata", "data_type": "json"},
],
primary_key="id"
)
def users():
yield {
"id": 1,
"email": "[email protected]",
"created_at": pendulum.now(),
"metadata": {"source": "api"}
}
Resource Methods
add_limit()
Limit the number of items:
resource = github_issues()
limited = resource.add_limit(100) # Only fetch 100 issues
load_info = pipeline.run(limited)
add_filter()
Filter items:
resource = github_issues()
filtered = resource.add_filter(lambda issue: issue["state"] == "open")
load_info = pipeline.run(filtered)
add_map()
Transform items:
resource = github_issues()
mapped = resource.add_map(lambda issue: {
"id": issue["id"],
"title": issue["title"],
"created": issue["created_at"]
})
load_info = pipeline.run(mapped)
with_name()
Change the resource name:
resource = github_issues()
renamed = resource.with_name("repo_issues")
load_info = pipeline.run(renamed)
apply_hints()
Modify hints dynamically:
resource = github_issues()
resource.apply_hints(
write_disposition="replace",
primary_key="id",
columns={"priority": {"data_type": "bigint"}}
)
load_info = pipeline.run(resource)
Create dependent resources that transform data from parent resources:
import dlt
@dlt.resource
def players():
"""Fetch player profiles"""
yield {"username": "magnuscarlsen", "title": "GM"}
yield {"username": "hikaru", "title": "GM"}
@dlt.transformer(data_from=players, write_disposition="append")
def player_games(player_item):
"""Fetch games for each player"""
username = player_item["username"]
url = f"https://api.chess.com/pub/player/{username}/games/archives"
response = requests.get(url)
for archive in response.json()["archives"]:
games_response = requests.get(archive)
yield from games_response.json()["games"]
# Both resources are loaded
load_info = pipeline.run([players, player_games])
Creating Resources from Data
Create resources without decorators:
import dlt
# From a list
resource = dlt.resource([1, 2, 3], name="numbers")
# From a generator
def generate_data():
for i in range(100):
yield {"value": i}
resource = dlt.resource(generate_data(), name="generated", write_disposition="replace")
# From a Pandas DataFrame
import pandas as pd
df = pd.read_csv("data.csv")
resource = dlt.resource(df, name="csv_data")
load_info = pipeline.run(resource)
Type Signature
From /home/daytona/workspace/source/dlt/extract/decorators.py:540-564:
def resource(
data: Optional[Any] = None,
/,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
) -> Any
Best Practices
Choose the right write disposition
Use append for events, replace for snapshots, and merge for slowly changing data
Use incremental loading
Implement incremental loading with dlt.sources.incremental() or resource state to avoid reprocessing data
Set primary keys
Always define primary keys for merge operations to ensure proper deduplication
Handle pagination properly
Use helper utilities like paginate from rest_client for API pagination
Yield items incrementally
Use yield instead of return to process data in chunks and avoid memory issues
Generator Functions: Resources must be generator functions (using yield) or return iterables. Regular functions that return single values won’t work as resources.
Common Patterns
REST API Resource
import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
@dlt.resource(
write_disposition="append",
primary_key="id"
)
def api_endpoint(
api_key: str = dlt.secrets.value,
endpoint: str = dlt.config.value
):
auth = BearerTokenAuth(api_key)
for page in paginate(endpoint, auth=auth):
yield page
Database Table Resource
import dlt
import sqlalchemy as sa
@dlt.resource(write_disposition="replace")
def database_table(connection_string: str = dlt.secrets.value):
engine = sa.create_engine(connection_string)
with engine.connect() as conn:
query = sa.text("SELECT * FROM users WHERE updated_at > :since")
result = conn.execute(query, {"since": "2024-01-01"})
for row in result:
yield dict(row._mapping)
File-based Resource
import dlt
import glob
import json
@dlt.resource(write_disposition="replace")
def json_files(directory: str):
"""Load all JSON files from a directory"""
for filepath in glob.glob(f"{directory}/*.json"):
with open(filepath) as f:
data = json.load(f)
yield data
- Source - Groups multiple related resources
- Pipeline - Executes resource data extraction
- Schema - Defines resource table structure
- State - Manages incremental loading state