The DltResource class represents a data source that can be loaded into a destination. Resources are created using the @dlt.resource decorator and contain a data pipe with table schema configuration.
Creating a Resource
Resources are created by decorating a function with @dlt.resource:
import dlt
@dlt.resource(write_disposition="append", primary_key="id")
def users():
yield [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
Properties
name
The name of the resource, inherited from the pipe.
Returns: str - Resource name
Source: ~/workspace/source/dlt/extract/resource.py:231
selected
Indicates if the resource is selected for loading.
if resource.selected:
print("Resource will be loaded")
Returns: bool - Selection status
Checks if the resource is a transformer that takes data from another resource.
if resource.is_transformer:
print("This is a transformer resource")
Returns: bool - True if resource is a transformer
Source: ~/workspace/source/dlt/extract/resource.py:240
requires_args
Checks if resource has unbound arguments that need to be provided.
if resource.requires_args:
resource = resource(param1="value")
Returns: bool - True if arguments are required
Source: ~/workspace/source/dlt/extract/resource.py:245
args_bound
Returns true if resource parameters are bound to values.
if resource.args_bound:
pipeline.run(resource)
Returns: bool - True if arguments are bound
Source: ~/workspace/source/dlt/extract/resource.py:580
explicit_args
Returns a dictionary of arguments used to parametrize the resource.
args = resource.explicit_args
Returns: StrAny - Dictionary of explicit arguments
Source: ~/workspace/source/dlt/extract/resource.py:588
incremental
Gets the incremental transform if present in the pipe.
if resource.incremental:
print(f"Incremental cursor: {resource.incremental.cursor_path}")
Returns: Optional[IncrementalResourceWrapper] - Incremental wrapper or None
Source: ~/workspace/source/dlt/extract/resource.py:254
validator
Gets or sets the validator transform in the pipe.
# Get validator
validator = resource.validator
# Set validator
from pydantic import BaseModel
class UserModel(BaseModel):
id: int
name: str
resource.validator = UserModel
Returns: Optional[ValidateItem] - Validator or None
Source: ~/workspace/source/dlt/extract/resource.py:267
max_table_nesting
Schema hint for maximum depth of nested tables.
resource.max_table_nesting = 2
Returns: Optional[int] - Maximum nesting level
Source: ~/workspace/source/dlt/extract/resource.py:279
state
Gets resource-scoped state from the active pipeline.
state = resource.state
state["last_id"] = 100
Returns: StrAny - Dictionary containing resource state
Source: ~/workspace/source/dlt/extract/resource.py:596
custom_metrics
Customizable resource metrics dictionary.
metrics = resource.custom_metrics
metrics["processed_items"] = 1000
Returns: Dict[str, Any] - Metrics dictionary
Source: ~/workspace/source/dlt/extract/resource.py:226
Methods
with_name()
Clones the resource with a new name. The cloned resource keeps separate state.
new_resource = resource.with_name("users_backup")
New name for the resource.
Returns: DltResource - Cloned resource with new name
Source: ~/workspace/source/dlt/extract/resource.py:235
add_map()
Adds a mapping function to transform data items in the resource pipe.
def uppercase_name(item):
item["name"] = item["name"].upper()
return item
resource.add_map(uppercase_name)
item_map
ItemTransformFunc[TDataItem]
required
Function that takes a single data item and returns transformed item.
Position in pipe to insert the mapping. None appends to end.
Returns: DltResource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/resource.py:327
add_yield_map()
Adds a generating function that yields 0 or more items from each input item.
def split_record(item):
# Split one item into multiple
for sub_item in item["children"]:
yield sub_item
resource.add_yield_map(split_record)
item_map
ItemTransformFunc[Iterator[TDataItem]]
required
Function that takes a single data item and yields 0 or more items.
Position in pipe to insert the generator.
Returns: DltResource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/resource.py:367
add_filter()
Adds a filter function to selectively include data items.
def active_users_only(item):
return item["status"] == "active"
resource.add_filter(active_users_only)
item_filter
ItemTransformFunc[bool]
required
Function that takes a single data item and returns bool. If True, item is kept.
Position in pipe to insert the filter.
Returns: DltResource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/resource.py:390
add_limit()
Limits the number of items processed by count or time.
# Limit to 1000 items
resource.add_limit(max_items=1000)
# Limit to 60 seconds
resource.add_limit(max_time=60)
# Count rows instead of pages
resource.add_limit(max_items=1000, count_rows=True)
Maximum number of items to yield.
Maximum seconds for the resource to run.
Count rows instead of pages.
Returns: DltResource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/resource.py:409
select_tables()
For resources that dynamically dispatch to multiple tables, selects which tables receive data.
resource.select_tables("users", "orders")
Names of tables to include.
Returns: DltResource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/resource.py:312
parallelize()
Marks the resource to run in parallel.
Returns: DltResource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/resource.py:447
bind()
Binds a parametrized resource to passed arguments.
@dlt.resource
def paginated_data(page_size: int = 100):
# resource implementation
pass
bound_resource = paginated_data.bind(page_size=50)
Returns: DltResource - Returns self with bound arguments
Source: ~/workspace/source/dlt/extract/resource.py:548
pipe_data_from()
Replaces the parent pipe in a transformer resource.
@dlt.transformer
def enrich_users(user_item):
user_item["enriched"] = True
yield user_item
enrich_users.pipe_data_from(users_resource)
The resource to pipe data from.
Returns: DltResource - Returns self for chaining
Source: ~/workspace/source/dlt/extract/resource.py:292
Example Usage
Basic Resource
import dlt
@dlt.resource(write_disposition="append")
def users():
yield [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
pipeline = dlt.pipeline(
pipeline_name="users_pipeline",
destination="duckdb"
)
pipeline.run(users())
Resource with Schema Hints
import dlt
@dlt.resource(
write_disposition="merge",
primary_key="id",
columns={
"id": {"data_type": "bigint"},
"email": {"data_type": "text", "nullable": False},
"created_at": {"data_type": "timestamp"}
}
)
def users():
# Your data fetching logic
yield fetch_users()
pipeline.run(users())
Incremental Resource
import dlt
from datetime import datetime
@dlt.resource(
write_disposition="append",
primary_key="id"
)
def incremental_users(
updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01")
):
# Fetch only new/updated records
users = fetch_users(since=updated_at.last_value)
yield users
pipeline.run(incremental_users())
Parametrized Resource
import dlt
@dlt.resource
def api_data(endpoint: str, api_key: str = dlt.secrets.value):
response = requests.get(
f"https://api.example.com/{endpoint}",
headers={"Authorization": f"Bearer {api_key}"}
)
yield response.json()
# Bind parameters
users = api_data("users")
orders = api_data("orders")
pipeline.run([users, orders])
import dlt
@dlt.resource
def users():
yield [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
@dlt.transformer(data_from=users, write_disposition="append")
def users_with_email(user_item):
user_item["email"] = f"{user_item['name'].lower()}@example.com"
yield user_item
pipeline.run([users, users_with_email])
Resource with Data Processing
import dlt
@dlt.resource
def processed_data():
raw_data = fetch_raw_data()
yield raw_data
# Add transformations
resource = processed_data()
resource.add_filter(lambda item: item["status"] == "active")
resource.add_map(lambda item: {**item, "processed": True})
resource.add_limit(max_items=1000)
pipeline.run(resource)
Resource with Validation
import dlt
from pydantic import BaseModel
class UserModel(BaseModel):
id: int
name: str
email: str
@dlt.resource
def users():
yield fetch_users()
resource = users()
resource.validator = UserModel
pipeline.run(resource)
Resource with Multiple Tables
import dlt
@dlt.resource
def dynamic_tables():
# Dispatch to different tables
yield dlt.mark.with_table_name({"id": 1, "data": "A"}, "table_a")
yield dlt.mark.with_table_name({"id": 2, "data": "B"}, "table_b")
# Select specific tables
resource = dynamic_tables().select_tables("table_a")
pipeline.run(resource)