Skip to main content

Overview

The @dlt.transformer decorator is a specialized form of @dlt.resource that takes input from other resources via the data_from argument in order to enrich or transform the data. Transformers enable you to build multi-step data pipelines where one resource processes the output of another.

Signature

@dlt.transformer(
    data_from: DltResource = DltResource.Empty,
    name: str = None,
    table_name: str = None,
    max_table_nesting: int = None,
    write_disposition: str = None,
    columns: dict | Type = None,
    primary_key: str | List[str] = None,
    merge_key: str | List[str] = None,
    schema_contract: dict = None,
    table_format: str = None,
    file_format: str = None,
    references: list = None,
    nested_hints: dict = None,
    selected: bool = True,
    spec: Type[BaseConfiguration] = None,
    parallelized: bool = False,
    section: str = None,
    standalone: bool = None,
)

Parameters

f
Callable
required
A function taking minimum one argument of TDataItems type which will receive data yielded from data_from resource. The function can also accept a meta argument to receive metadata associated with the data item.
data_from
DltResource
default:"DltResource.Empty"
A resource that will send data to the decorated function. Can be specified when the transformer is created or bound later using the | operator.
name
str
default:"None"
A name of the resource that by default also becomes the name of the table to which the data is loaded. If not present, the name of the decorated function will be used.
table_name
str | Callable
default:"None"
A table name, if different from name. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
max_table_nesting
int
default:"None"
A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.
write_disposition
str
default:"None"
Controls how to write data to a table. Options: append, replace, skip, or merge. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
columns
dict | List | Type
default:"None"
A list, dict or pydantic model of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
primary_key
str | List[str]
default:"None"
A column name or a list of column names that comprise a primary key. Typically used with “merge” write disposition to deduplicate loaded data. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
merge_key
str | List[str]
default:"None"
A column name or a list of column names that define a merge key. Typically used with “merge” write disposition to remove overlapping data ranges. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
schema_contract
dict
default:"None"
Schema contract settings that will be applied to this resource.
table_format
str
default:"None"
Defines the storage format of the table. Currently only “iceberg” is supported on Athena, and “delta” on the filesystem. Other destinations ignore this hint.
file_format
str
default:"None"
Format of the file in which resource data is stored. Useful when importing external files. Use preferred to force a file format that is preferred by the destination used.
references
list
default:"None"
A list of references to other table’s columns in the form:
[{
    'referenced_table': 'other_table',
    'columns': ['col1', 'col2'],
    'referenced_columns': ['other_col1', 'other_col2']
}]
nested_hints
dict
default:"None"
Hints for nested tables created by this resource.
selected
bool
default:"True"
When True, dlt pipeline will extract and load this resource. If False, the resource will be ignored.
spec
Type[BaseConfiguration]
default:"None"
A specification of configuration and secret values required by the transformer.
parallelized
bool
default:"False"
When True, the resource will be loaded in parallel with other resources.
section
str
default:"None"
Configuration section that comes right after ‘sources’ in default layout. If not present, the current python module name will be used. Default layout is sources.<section>.<name>.<key_name>.
standalone
bool
default:"None"
Deprecated. Past functionality got merged into regular resource.

Returns

transformer
DltResource
A DltResource instance which may be loaded, iterated or combined with other resources into a pipeline.

Usage Examples

Basic Transformer with Pipe Operator

import dlt
import requests

@dlt.resource
def players(title, chess_url=dlt.config.value):
    r = requests.get(f"{chess_url}titled/{title}")
    yield r.json()["players"]  # returns list of player names

# This transformer takes data from players and returns profiles
@dlt.transformer(write_disposition="replace")
def player_profile(player: str) -> Iterator[dict]:
    r = requests.get(f"{chess_url}player/{player}")
    r.raise_for_status()
    yield r.json()

# Pipe the data from players into player_profile
pipeline = dlt.pipeline(destination="duckdb")
pipeline.run(players("GM") | player_profile)

Early Binding with data_from

import dlt

@dlt.resource
def users():
    yield [{"id": 1, "name": "Alice"}]
    yield [{"id": 2, "name": "Bob"}]

@dlt.transformer(data_from=users)
def enrich_users(user: dict):
    # Add additional data to each user
    user["enriched"] = True
    user["score"] = len(user["name"]) * 10
    yield user

pipeline = dlt.pipeline(destination="duckdb")
pipeline.run(enrich_users())

Transformer with Metadata

@dlt.transformer
def process_with_meta(item: dict, meta: dict):
    # Access metadata about the item
    item["source_table"] = meta.get("table_name")
    item["processed_at"] = datetime.now().isoformat()
    yield item

# Bind to a resource
pipeline.run(users() | process_with_meta)

Transformer with Configuration

@dlt.transformer
def api_enrichment(
    item: dict,
    api_key: str = dlt.secrets.value,
    api_url: str = dlt.config.value
):
    # Enrich item with API call
    response = requests.get(
        f"{api_url}/enrich/{item['id']}",
        headers={"Authorization": f"Bearer {api_key}"}
    )
    item["enrichment"] = response.json()
    yield item

pipeline.run(users() | api_enrichment)

Chaining Multiple Transformers

@dlt.resource
def raw_data():
    yield [{"value": 10}, {"value": 20}, {"value": 30}]

@dlt.transformer
def double_value(item: dict):
    item["value"] = item["value"] * 2
    yield item

@dlt.transformer
def add_timestamp(item: dict):
    item["timestamp"] = datetime.now().isoformat()
    yield item

# Chain transformers using pipe operator
pipeline.run(raw_data() | double_value | add_timestamp)

Transformer that Filters Data

@dlt.resource
def all_products():
    yield [{"id": 1, "price": 10, "active": True}]
    yield [{"id": 2, "price": 20, "active": False}]
    yield [{"id": 3, "price": 30, "active": True}]

@dlt.transformer
def active_only(product: dict):
    if product["active"]:
        yield product

pipeline.run(all_products() | active_only)
# Only products with active=True will be loaded

Transformer with Merge Disposition

@dlt.resource
def order_updates():
    yield [{"order_id": 1, "status": "pending"}]
    yield [{"order_id": 2, "status": "shipped"}]

@dlt.transformer(
    write_disposition="merge",
    primary_key="order_id"
)
def enrich_orders(order: dict):
    # Add enrichment data
    order["updated_at"] = datetime.now().isoformat()
    order["enriched"] = True
    yield order

pipeline.run(order_updates() | enrich_orders)

Transformer that Expands Data

@dlt.resource
def users():
    yield [{"id": 1, "tags": ["admin", "developer"]}]
    yield [{"id": 2, "tags": ["user"]}]

@dlt.transformer(table_name="user_tags")
def expand_tags(user: dict):
    # Create one row per tag
    for tag in user["tags"]:
        yield {
            "user_id": user["id"],
            "tag": tag
        }

pipeline.run(users() | expand_tags)

Key Differences from @dlt.resource

  1. Data Input: Transformers receive data from other resources via the first argument, while regular resources generate their own data
  2. Function Signature: Transformer functions must accept at least one argument to receive input data items
  3. Binding: Transformers can be bound to resources either early (via data_from parameter) or late (via | operator)
  4. Use Case: Transformers are designed for data enrichment and transformation workflows

Performance Considerations

If a transformer is defined as an inner function and has additional arguments that will be injected from configuration, note that transformers are called for each data item (typically batch or page) and configuration injection is costly. Consider using dlt.source to get the required configuration and pass them explicitly to your transformer.

See Also

Build docs developers (and LLMs) love