Skip to main content
The DltResource class represents a data source that can be loaded into a destination. Resources are created using the @dlt.resource decorator and contain a data pipe with table schema configuration.

Creating a Resource

Resources are created by decorating a function with @dlt.resource:
import dlt

@dlt.resource(write_disposition="append", primary_key="id")
def users():
    yield [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

Properties

name

The name of the resource, inherited from the pipe.
name = resource.name
Returns: str - Resource name Source: ~/workspace/source/dlt/extract/resource.py:231

selected

Indicates if the resource is selected for loading.
if resource.selected:
    print("Resource will be loaded")
Returns: bool - Selection status

is_transformer

Checks if the resource is a transformer that takes data from another resource.
if resource.is_transformer:
    print("This is a transformer resource")
Returns: bool - True if resource is a transformer Source: ~/workspace/source/dlt/extract/resource.py:240

requires_args

Checks if resource has unbound arguments that need to be provided.
if resource.requires_args:
    resource = resource(param1="value")
Returns: bool - True if arguments are required Source: ~/workspace/source/dlt/extract/resource.py:245

args_bound

Returns true if resource parameters are bound to values.
if resource.args_bound:
    pipeline.run(resource)
Returns: bool - True if arguments are bound Source: ~/workspace/source/dlt/extract/resource.py:580

explicit_args

Returns a dictionary of arguments used to parametrize the resource.
args = resource.explicit_args
Returns: StrAny - Dictionary of explicit arguments Source: ~/workspace/source/dlt/extract/resource.py:588

incremental

Gets the incremental transform if present in the pipe.
if resource.incremental:
    print(f"Incremental cursor: {resource.incremental.cursor_path}")
Returns: Optional[IncrementalResourceWrapper] - Incremental wrapper or None Source: ~/workspace/source/dlt/extract/resource.py:254

validator

Gets or sets the validator transform in the pipe.
# Get validator
validator = resource.validator

# Set validator
from pydantic import BaseModel

class UserModel(BaseModel):
    id: int
    name: str

resource.validator = UserModel
Returns: Optional[ValidateItem] - Validator or None Source: ~/workspace/source/dlt/extract/resource.py:267

max_table_nesting

Schema hint for maximum depth of nested tables.
resource.max_table_nesting = 2
Returns: Optional[int] - Maximum nesting level Source: ~/workspace/source/dlt/extract/resource.py:279

state

Gets resource-scoped state from the active pipeline.
state = resource.state
state["last_id"] = 100
Returns: StrAny - Dictionary containing resource state Source: ~/workspace/source/dlt/extract/resource.py:596

custom_metrics

Customizable resource metrics dictionary.
metrics = resource.custom_metrics
metrics["processed_items"] = 1000
Returns: Dict[str, Any] - Metrics dictionary Source: ~/workspace/source/dlt/extract/resource.py:226

Methods

with_name()

Clones the resource with a new name. The cloned resource keeps separate state.
new_resource = resource.with_name("users_backup")
new_name
str
required
New name for the resource.
new_section
str
default:"None"
New section name.
Returns: DltResource - Cloned resource with new name Source: ~/workspace/source/dlt/extract/resource.py:235

add_map()

Adds a mapping function to transform data items in the resource pipe.
def uppercase_name(item):
    item["name"] = item["name"].upper()
    return item

resource.add_map(uppercase_name)
item_map
ItemTransformFunc[TDataItem]
required
Function that takes a single data item and returns transformed item.
insert_at
int
default:"None"
Position in pipe to insert the mapping. None appends to end.
Returns: DltResource - Returns self for chaining Source: ~/workspace/source/dlt/extract/resource.py:327

add_yield_map()

Adds a generating function that yields 0 or more items from each input item.
def split_record(item):
    # Split one item into multiple
    for sub_item in item["children"]:
        yield sub_item

resource.add_yield_map(split_record)
item_map
ItemTransformFunc[Iterator[TDataItem]]
required
Function that takes a single data item and yields 0 or more items.
insert_at
int
default:"None"
Position in pipe to insert the generator.
Returns: DltResource - Returns self for chaining Source: ~/workspace/source/dlt/extract/resource.py:367

add_filter()

Adds a filter function to selectively include data items.
def active_users_only(item):
    return item["status"] == "active"

resource.add_filter(active_users_only)
item_filter
ItemTransformFunc[bool]
required
Function that takes a single data item and returns bool. If True, item is kept.
insert_at
int
default:"None"
Position in pipe to insert the filter.
Returns: DltResource - Returns self for chaining Source: ~/workspace/source/dlt/extract/resource.py:390

add_limit()

Limits the number of items processed by count or time.
# Limit to 1000 items
resource.add_limit(max_items=1000)

# Limit to 60 seconds
resource.add_limit(max_time=60)

# Count rows instead of pages
resource.add_limit(max_items=1000, count_rows=True)
max_items
int
default:"None"
Maximum number of items to yield.
max_time
float
default:"None"
Maximum seconds for the resource to run.
count_rows
bool
default:"False"
Count rows instead of pages.
Returns: DltResource - Returns self for chaining Source: ~/workspace/source/dlt/extract/resource.py:409

select_tables()

For resources that dynamically dispatch to multiple tables, selects which tables receive data.
resource.select_tables("users", "orders")
*table_names
str
required
Names of tables to include.
Returns: DltResource - Returns self for chaining Source: ~/workspace/source/dlt/extract/resource.py:312

parallelize()

Marks the resource to run in parallel.
resource.parallelize()
Returns: DltResource - Returns self for chaining Source: ~/workspace/source/dlt/extract/resource.py:447

bind()

Binds a parametrized resource to passed arguments.
@dlt.resource
def paginated_data(page_size: int = 100):
    # resource implementation
    pass

bound_resource = paginated_data.bind(page_size=50)
*args
Any
Positional arguments.
**kwargs
Any
Keyword arguments.
Returns: DltResource - Returns self with bound arguments Source: ~/workspace/source/dlt/extract/resource.py:548

pipe_data_from()

Replaces the parent pipe in a transformer resource.
@dlt.transformer
def enrich_users(user_item):
    user_item["enriched"] = True
    yield user_item

enrich_users.pipe_data_from(users_resource)
data_from
DltResource
required
The resource to pipe data from.
Returns: DltResource - Returns self for chaining Source: ~/workspace/source/dlt/extract/resource.py:292

Example Usage

Basic Resource

import dlt

@dlt.resource(write_disposition="append")
def users():
    yield [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

pipeline = dlt.pipeline(
    pipeline_name="users_pipeline",
    destination="duckdb"
)
pipeline.run(users())

Resource with Schema Hints

import dlt

@dlt.resource(
    write_disposition="merge",
    primary_key="id",
    columns={
        "id": {"data_type": "bigint"},
        "email": {"data_type": "text", "nullable": False},
        "created_at": {"data_type": "timestamp"}
    }
)
def users():
    # Your data fetching logic
    yield fetch_users()

pipeline.run(users())

Incremental Resource

import dlt
from datetime import datetime

@dlt.resource(
    write_disposition="append",
    primary_key="id"
)
def incremental_users(
    updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01")
):
    # Fetch only new/updated records
    users = fetch_users(since=updated_at.last_value)
    yield users

pipeline.run(incremental_users())

Parametrized Resource

import dlt

@dlt.resource
def api_data(endpoint: str, api_key: str = dlt.secrets.value):
    response = requests.get(
        f"https://api.example.com/{endpoint}",
        headers={"Authorization": f"Bearer {api_key}"}
    )
    yield response.json()

# Bind parameters
users = api_data("users")
orders = api_data("orders")

pipeline.run([users, orders])

Transformer Resource

import dlt

@dlt.resource
def users():
    yield [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

@dlt.transformer(data_from=users, write_disposition="append")
def users_with_email(user_item):
    user_item["email"] = f"{user_item['name'].lower()}@example.com"
    yield user_item

pipeline.run([users, users_with_email])

Resource with Data Processing

import dlt

@dlt.resource
def processed_data():
    raw_data = fetch_raw_data()
    yield raw_data

# Add transformations
resource = processed_data()
resource.add_filter(lambda item: item["status"] == "active")
resource.add_map(lambda item: {**item, "processed": True})
resource.add_limit(max_items=1000)

pipeline.run(resource)

Resource with Validation

import dlt
from pydantic import BaseModel

class UserModel(BaseModel):
    id: int
    name: str
    email: str

@dlt.resource
def users():
    yield fetch_users()

resource = users()
resource.validator = UserModel

pipeline.run(resource)

Resource with Multiple Tables

import dlt

@dlt.resource
def dynamic_tables():
    # Dispatch to different tables
    yield dlt.mark.with_table_name({"id": 1, "data": "A"}, "table_a")
    yield dlt.mark.with_table_name({"id": 2, "data": "B"}, "table_b")

# Select specific tables
resource = dynamic_tables().select_tables("table_a")
pipeline.run(resource)

Build docs developers (and LLMs) love