Skip to main content

Overview

The @dlt.destination decorator transforms a function that processes data items into a custom dlt destination. This allows you to create custom data sinks that receive batched data from dlt pipelines and handle it according to your own logic. The decorator can also be used as a factory function to instantiate built-in destinations.

Signature

@dlt.destination(
    loader_file_format: str = None,
    batch_size: int = 10,
    name: str = None,
    naming_convention: str = "direct",
    skip_dlt_columns_and_tables: bool = True,
    max_table_nesting: int = 0,
    spec: Type[CustomDestinationClientConfiguration] = None,
    max_parallel_load_jobs: int = None,
    loader_parallelism_strategy: str = None,
)

Parameters

func
Callable
required
A callable that takes two positional arguments:
  • items: Either a list of data items or a file path (string) if batch_size=0
  • table: A dictionary containing the table schema with keys like name, columns, etc.
The function can also accept keyword arguments with default values for configuration or secret values. The function does not need to return anything.
loader_file_format
str
default:"None"
Defines the format in which files are stored in the load package before being sent to the destination function. Supported formats include “jsonl”, “parquet”, “csv”, etc.
batch_size
int
default:"10"
Defines how many items per function call are batched together and sent as an array.
  • If set to a positive number, items are batched and passed as a list
  • If set to 0, instead of passing actual data items, you will receive one call per load job with the path of the file as the items argument, allowing you to open and process the file directly
name
str
default:"None"
Defines the name of the destination. If not provided, defaults to the name of the decorated function.
naming_convention
str
default:"'direct'"
Controls how table and column names are normalized. The default value “direct” keeps all names unchanged. Other options include “snake_case”, “sql_cs_v1”, etc.
skip_dlt_columns_and_tables
bool
default:"True"
Defines whether internal dlt tables and columns (like _dlt_id, _dlt_load_id) are included in the custom destination function.
max_table_nesting
int
default:"0"
Defines how deep the normalizer will go to flatten nested fields in your data to create subtables. This overrides any source settings. A value of 0 means no nested tables are created.
spec
Type[CustomDestinationClientConfiguration]
default:"None"
Defines a configuration spec used to inject arguments into the decorated function. Arguments not included in the spec will not be injected.
max_parallel_load_jobs
int
default:"None"
Defines the maximum number of load jobs that can run concurrently during the load process.
loader_parallelism_strategy
str
default:"None"
Determines the load job parallelism strategy:
  • "sequential": Equivalent to max_parallel_load_jobs=1
  • "table-sequential": One load job per table at a time
  • "parallel": Multiple jobs can run in parallel

Returns

destination
Callable
When used as a decorator: Returns a callable that creates a custom dlt destination instance when called.When used as a factory function with a string argument: Returns an initialized destination instance.

Usage Examples

Basic Custom Destination

import dlt

@dlt.destination(batch_size=100)
def my_destination(items, table):
    print(f"Processing {len(items)} items for table {table['name']}")
    for item in items:
        # Process each item
        print(item)

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination=my_destination
)

@dlt.resource
def data():
    yield [{"id": 1, "name": "Alice"}]
    yield [{"id": 2, "name": "Bob"}]

pipeline.run(data())

File-Based Processing (batch_size=0)

import dlt
import json

@dlt.destination(batch_size=0, loader_file_format="jsonl")
def file_destination(file_path: str, table):
    print(f"Processing file {file_path} for table {table['name']}")
    
    with open(file_path, 'r') as f:
        for line in f:
            item = json.loads(line)
            # Process each item from file
            print(item)

pipeline = dlt.pipeline(destination=file_destination)
pipeline.run(data())

Destination with Configuration and Secrets

import dlt
import requests

@dlt.destination(
    batch_size=50,
    loader_file_format="parquet"
)
def api_destination(
    items,
    table,
    api_url: str = dlt.config.value,
    api_secret: str = dlt.secrets.value
):
    # Send data to external API
    response = requests.post(
        f"{api_url}/data/{table['name']}",
        json=items,
        headers={"Authorization": f"Bearer {api_secret}"}
    )
    response.raise_for_status()
    print(f"Loaded {len(items)} items to {table['name']}")

pipeline = dlt.pipeline(destination=api_destination)
pipeline.run(data())

Database Destination

import dlt
import psycopg2

@dlt.destination(batch_size=1000)
def postgres_destination(
    items,
    table,
    connection_string: str = dlt.secrets.value
):
    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()
    
    # Create table if not exists (simplified)
    table_name = table['name']
    
    # Insert items
    for item in items:
        columns = ', '.join(item.keys())
        values = ', '.join(['%s'] * len(item))
        query = f"INSERT INTO {table_name} ({columns}) VALUES ({values})"
        cursor.execute(query, list(item.values()))
    
    conn.commit()
    cursor.close()
    conn.close()

pipeline = dlt.pipeline(destination=postgres_destination)
pipeline.run(data())

Custom Destination with Table Schema Access

import dlt

@dlt.destination(batch_size=100)
def schema_aware_destination(items, table):
    print(f"Table name: {table['name']}")
    print(f"Table columns: {table['columns']}")
    
    # Access column information
    for col_name, col_info in table['columns'].items():
        print(f"Column {col_name}: {col_info.get('data_type')}")
    
    # Process items with schema awareness
    for item in items:
        # Validate or transform based on schema
        pass

pipeline = dlt.pipeline(destination=schema_aware_destination)
pipeline.run(data())

Destination Without dlt Internal Columns

import dlt

@dlt.destination(
    batch_size=100,
    skip_dlt_columns_and_tables=True  # Default behavior
)
def clean_destination(items, table):
    # Items will not contain _dlt_id, _dlt_load_id, etc.
    for item in items:
        print(item)  # Only your data columns

pipeline = dlt.pipeline(destination=clean_destination)
pipeline.run(data())

Destination with Custom Naming Convention

import dlt

@dlt.destination(
    batch_size=100,
    naming_convention="snake_case"
)
def normalized_destination(items, table):
    # Table and column names will be converted to snake_case
    print(f"Table: {table['name']}")
    for item in items:
        print(item)

pipeline = dlt.pipeline(destination=normalized_destination)
pipeline.run(data())

Using as a Factory Function

import dlt

# Create a destination instance from built-in destinations
dest = dlt.destination(
    "my_warehouse",
    destination_type="duckdb",
    credentials="local.duckdb"
)

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination=dest
)

Parallel Loading Strategy

import dlt

@dlt.destination(
    batch_size=500,
    loader_parallelism_strategy="table-sequential",
    max_parallel_load_jobs=3
)
def parallel_destination(items, table):
    # Process items
    # This will run with table-sequential parallelism
    pass

pipeline = dlt.pipeline(destination=parallel_destination)
pipeline.run(data())

Understanding batch_size

The batch_size parameter controls how data is passed to your destination function:
  • batch_size > 0: Items are batched in arrays of the specified size
    @dlt.destination(batch_size=10)
    def my_dest(items, table):  # items is a list of up to 10 items
        for item in items:
            process(item)
    
  • batch_size = 0: You receive file paths instead of items
    @dlt.destination(batch_size=0)
    def my_dest(file_path: str, table):  # file_path is a string
        with open(file_path) as f:
            process_file(f)
    

Configuration Injection

Like other dlt decorators, @dlt.destination supports automatic configuration injection:
@dlt.destination(batch_size=100)
def my_destination(
    items,
    table,
    api_url: str = dlt.config.value,      # From config.toml
    api_secret: str = dlt.secrets.value,  # From secrets.toml
    timeout: int = 30                      # Default value
):
    # Configuration is automatically injected
    pass

See Also

Build docs developers (and LLMs) love