Skip to main content
A resource is a decorated function that yields data items. Resources represent individual data streams like API endpoints, database tables, or file collections. Each resource typically maps to a single table in your destination.

What is a Resource?

A resource in dlt:
  • Yields data items from a source like an API, database, or file
  • Defines table schema including name, columns, and data types
  • Controls write behavior with write dispositions (append, replace, merge)
  • Manages its own state for incremental loading
  • Can be composed into transformers and pipelines
Think of a resource as a single data stream that knows how to fetch data and describe its structure.

Creating a Resource

Use the @dlt.resource decorator:
import dlt
from dlt.sources.helpers.rest_client import paginate

@dlt.resource(write_disposition="append")
def github_issues(api_token: str = dlt.secrets.value):
    """Fetch all issues from a GitHub repository"""
    url = "https://api.github.com/repos/dlt-hub/dlt/issues"
    headers = {"Authorization": f"token {api_token}"}
    
    for page in paginate(url, headers=headers):
        yield page

As a Standalone Function

import dlt

pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="duckdb",
    dataset_name="github_data"
)

load_info = pipeline.run(github_issues())
print(load_info)

Inside a Source

import dlt

@dlt.source
def github_source(repo: str = "dlt-hub/dlt"):
    @dlt.resource(write_disposition="replace")
    def issues():
        url = f"https://api.github.com/repos/{repo}/issues"
        for page in paginate(url):
            yield page
    
    @dlt.resource(write_disposition="append")
    def pull_requests():
        url = f"https://api.github.com/repos/{repo}/pulls"
        for page in paginate(url):
            yield page
    
    return [issues, pull_requests]

Resource Decorator Parameters

From /home/daytona/workspace/source/dlt/extract/decorators.py:540-563:
@dlt.resource(
    name="my_resource",            # Resource name (defaults to function name)
    table_name="my_table",         # Table name if different from resource name
    write_disposition="append",    # How to write: append, replace, merge, skip
    columns={...},                 # Column definitions and hints
    primary_key="id",              # Primary key column(s)
    merge_key="updated_at",        # Merge key for deduplication
    max_table_nesting=2,           # Max nesting depth
    schema_contract={              # Schema evolution control
        "tables": "evolve",
        "columns": "freeze",
        "data_type": "evolve"
    },
    selected=True,                 # Include in pipeline extraction
    parallelized=False,            # Extract in parallel
    incremental=dlt.sources.incremental("updated_at")  # Incremental loading
)
def my_resource():
    yield {...}

Essential Parameters

name (str): Resource and default table name. Defaults to function name.
write_disposition (str): Controls data writing behavior:
  • "append" - Add new data to existing table (default)
  • "replace" - Replace all existing data
  • "merge" - Deduplicate and merge based on primary_key/merge_key
  • "skip" - Skip loading entirely
primary_key (str | list[str]): Column(s) that uniquely identify records. Used with merge disposition.
merge_key (str | list[str]): Column(s) for deduplication. Removes overlapping data ranges.

Write Dispositions

Append

Always add new data:
@dlt.resource(write_disposition="append")
def events():
    """Each run adds new events to the table"""
    yield {"event": "user_login", "timestamp": pendulum.now()}
    yield {"event": "page_view", "timestamp": pendulum.now()}

Replace

Replace all existing data:
@dlt.resource(write_disposition="replace")
def current_users():
    """Latest snapshot of users - replaces previous data"""
    response = requests.get("https://api.example.com/users")
    yield response.json()

Merge

Deduplicate and merge data:
@dlt.resource(
    write_disposition="merge",
    primary_key="id",
    merge_key="updated_at"
)
def user_profiles():
    """Merge based on ID, remove old updates"""
    for user in get_updated_users():
        yield user

SCD2 (Slowly Changing Dimension Type 2)

@dlt.resource(
    write_disposition={
        "disposition": "merge",
        "strategy": "scd2"
    },
    primary_key="id"
)
def users_history():
    """Track historical changes to user records"""
    for user in get_all_users():
        yield user

Incremental Loading

Load only new or changed data:
import dlt
from dlt.sources.helpers import requests

@dlt.resource(
    write_disposition="append",
    primary_key="id"
)
def issues(updated_since=dlt.sources.incremental("updated_at")):
    """Load only issues updated since last run"""
    url = "https://api.github.com/repos/dlt-hub/dlt/issues"
    params = {}
    
    # Use the last value from previous run
    if updated_since.last_value:
        params["since"] = updated_since.last_value.isoformat()
    
    response = requests.get(url, params=params)
    yield response.json()

Incremental with Resource State

import dlt

@dlt.resource(write_disposition="append")
def players_games(players: list[str]):
    """Track which archives have been processed"""
    # Get resource-specific state
    state = dlt.current.resource_state()
    checked_archives = state.setdefault("archives", [])
    
    for player in players:
        archives = get_player_archives(player)
        
        for url in archives:
            if url in checked_archives:
                print(f"Skipping {url}")
                continue
            
            print(f"Processing {url}")
            checked_archives.append(url)
            
            # Fetch and yield games
            response = requests.get(url)
            yield response.json().get("games", [])

Schema Hints

Define column types and properties:
import dlt
from dlt.common.schema.typing import TColumnSchema

@dlt.resource(
    write_disposition="append",
    columns=[
        {"name": "id", "data_type": "bigint", "nullable": False},
        {"name": "email", "data_type": "text", "unique": True},
        {"name": "created_at", "data_type": "timestamp"},
        {"name": "metadata", "data_type": "json"},
    ],
    primary_key="id"
)
def users():
    yield {
        "id": 1,
        "email": "[email protected]",
        "created_at": pendulum.now(),
        "metadata": {"source": "api"}
    }

Resource Methods

add_limit()

Limit the number of items:
resource = github_issues()
limited = resource.add_limit(100)  # Only fetch 100 issues
load_info = pipeline.run(limited)

add_filter()

Filter items:
resource = github_issues()
filtered = resource.add_filter(lambda issue: issue["state"] == "open")
load_info = pipeline.run(filtered)

add_map()

Transform items:
resource = github_issues()
mapped = resource.add_map(lambda issue: {
    "id": issue["id"],
    "title": issue["title"],
    "created": issue["created_at"]
})
load_info = pipeline.run(mapped)

with_name()

Change the resource name:
resource = github_issues()
renamed = resource.with_name("repo_issues")
load_info = pipeline.run(renamed)

apply_hints()

Modify hints dynamically:
resource = github_issues()
resource.apply_hints(
    write_disposition="replace",
    primary_key="id",
    columns={"priority": {"data_type": "bigint"}}
)
load_info = pipeline.run(resource)

Transformers

Create dependent resources that transform data from parent resources:
import dlt

@dlt.resource
def players():
    """Fetch player profiles"""
    yield {"username": "magnuscarlsen", "title": "GM"}
    yield {"username": "hikaru", "title": "GM"}

@dlt.transformer(data_from=players, write_disposition="append")
def player_games(player_item):
    """Fetch games for each player"""
    username = player_item["username"]
    url = f"https://api.chess.com/pub/player/{username}/games/archives"
    response = requests.get(url)
    
    for archive in response.json()["archives"]:
        games_response = requests.get(archive)
        yield from games_response.json()["games"]

# Both resources are loaded
load_info = pipeline.run([players, player_games])

Creating Resources from Data

Create resources without decorators:
import dlt

# From a list
resource = dlt.resource([1, 2, 3], name="numbers")

# From a generator
def generate_data():
    for i in range(100):
        yield {"value": i}

resource = dlt.resource(generate_data(), name="generated", write_disposition="replace")

# From a Pandas DataFrame
import pandas as pd
df = pd.read_csv("data.csv")
resource = dlt.resource(df, name="csv_data")

load_info = pipeline.run(resource)

Type Signature

From /home/daytona/workspace/source/dlt/extract/decorators.py:540-564:
def resource(
    data: Optional[Any] = None,
    /,
    name: TTableHintTemplate[str] = None,
    table_name: TTableHintTemplate[str] = None,
    max_table_nesting: int = None,
    write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
    columns: TTableHintTemplate[TAnySchemaColumns] = None,
    primary_key: TTableHintTemplate[TColumnNames] = None,
    merge_key: TTableHintTemplate[TColumnNames] = None,
    schema_contract: TTableHintTemplate[TSchemaContract] = None,
    table_format: TTableHintTemplate[TTableFormat] = None,
    selected: bool = True,
    spec: Type[BaseConfiguration] = None,
    parallelized: bool = False,
    incremental: Optional[TIncrementalConfig] = None,
) -> Any

Best Practices

1

Choose the right write disposition

Use append for events, replace for snapshots, and merge for slowly changing data
2

Use incremental loading

Implement incremental loading with dlt.sources.incremental() or resource state to avoid reprocessing data
3

Set primary keys

Always define primary keys for merge operations to ensure proper deduplication
4

Handle pagination properly

Use helper utilities like paginate from rest_client for API pagination
5

Yield items incrementally

Use yield instead of return to process data in chunks and avoid memory issues
Generator Functions: Resources must be generator functions (using yield) or return iterables. Regular functions that return single values won’t work as resources.

Common Patterns

REST API Resource

import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

@dlt.resource(
    write_disposition="append",
    primary_key="id"
)
def api_endpoint(
    api_key: str = dlt.secrets.value,
    endpoint: str = dlt.config.value
):
    auth = BearerTokenAuth(api_key)
    for page in paginate(endpoint, auth=auth):
        yield page

Database Table Resource

import dlt
import sqlalchemy as sa

@dlt.resource(write_disposition="replace")
def database_table(connection_string: str = dlt.secrets.value):
    engine = sa.create_engine(connection_string)
    
    with engine.connect() as conn:
        query = sa.text("SELECT * FROM users WHERE updated_at > :since")
        result = conn.execute(query, {"since": "2024-01-01"})
        
        for row in result:
            yield dict(row._mapping)

File-based Resource

import dlt
import glob
import json

@dlt.resource(write_disposition="replace")
def json_files(directory: str):
    """Load all JSON files from a directory"""
    for filepath in glob.glob(f"{directory}/*.json"):
        with open(filepath) as f:
            data = json.load(f)
            yield data
  • Source - Groups multiple related resources
  • Pipeline - Executes resource data extraction
  • Schema - Defines resource table structure
  • State - Manages incremental loading state

Build docs developers (and LLMs) love