Custom Destinations
While dlt supports many destinations out of the box, you can create custom destination adapters to load data into any system. This is useful for proprietary systems, specialized databases, or when you need custom data processing logic.
Overview
A custom destination is a Python function or callable that receives data from dlt and processes it however you want. You can:
Load data into any database or API
Transform data before storing
Send data to multiple destinations
Implement custom business logic
Integrate with proprietary systems
Quick Start
Here’s a simple custom destination:
import dlt
from dlt.common.destination import Destination
from dlt.common.schema import TTableSchemaColumns
def my_custom_destination ( items , table , schema ):
"""A simple custom destination that prints data."""
print ( f "Loading { len (items) } items into table { table[ 'name' ] } " )
for item in items:
print (item)
# Use the custom destination
@dlt.resource
def my_data ():
yield { "id" : 1 , "name" : "Alice" }
yield { "id" : 2 , "name" : "Bob" }
pipeline = dlt.pipeline(
pipeline_name = "custom_pipeline" ,
destination = my_custom_destination,
dataset_name = "my_dataset"
)
pipeline.run(my_data())
Destination Function Signature
A custom destination function receives the following parameters:
def my_destination (
items : List[Dict[ str , Any]],
table : Dict[ str , Any],
schema : Dict[ str , Any]
) -> None :
"""
Args:
items: List of data items (rows) to load
table: Table schema information
schema: Complete schema information
"""
pass
Parameters
A list of data items (dictionaries) to be loaded. Each item represents one row.
Table schema information including:
name: Table name
columns: Column definitions
write_disposition: How to write data (append, replace, merge)
primary_key: Primary key columns (if any)
Complete schema information for the entire dataset
Examples
Example 1: Load to REST API
import dlt
import requests
from typing import List, Dict, Any
def api_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
"""Load data to a REST API."""
api_url = "https://api.example.com/data"
headers = { "Authorization" : "Bearer YOUR_TOKEN" }
for item in items:
response = requests.post(
api_url,
json = item,
headers = headers
)
response.raise_for_status()
print ( f "Loaded { len (items) } items to { table[ 'name' ] } " )
@dlt.resource
def my_data ():
yield { "id" : 1 , "value" : "data" }
pipeline = dlt.pipeline(
destination = api_destination,
dataset_name = "my_dataset"
)
pipeline.run(my_data())
Example 2: Load to MongoDB
import dlt
from pymongo import MongoClient
from typing import List, Dict, Any
def mongodb_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
"""Load data to MongoDB."""
client = MongoClient( "mongodb://localhost:27017/" )
db = client[schema[ "name" ]]
collection = db[table[ "name" ]]
# Handle write disposition
if table.get( "write_disposition" ) == "replace" :
collection.delete_many({})
if items:
collection.insert_many(items)
print ( f "Loaded { len (items) } items to { table[ 'name' ] } " )
@dlt.resource
def my_data ():
yield { "id" : 1 , "name" : "Alice" }
yield { "id" : 2 , "name" : "Bob" }
pipeline = dlt.pipeline(
destination = mongodb_destination,
dataset_name = "my_dataset"
)
pipeline.run(my_data())
Example 3: Custom CSV Writer
import dlt
import csv
from pathlib import Path
from typing import List, Dict, Any
def csv_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
"""Write data to CSV files."""
output_dir = Path( "./output" )
output_dir.mkdir( exist_ok = True )
file_path = output_dir / f " { table[ 'name' ] } .csv"
if not items:
return
# Determine mode based on write disposition
mode = "w" if table.get( "write_disposition" ) == "replace" else "a"
file_exists = file_path.exists()
with open (file_path, mode, newline = "" ) as f:
writer = csv.DictWriter(f, fieldnames = items[ 0 ].keys())
# Write header if new file
if mode == "w" or not file_exists:
writer.writeheader()
writer.writerows(items)
print ( f "Wrote { len (items) } items to { file_path } " )
@dlt.resource
def my_data ():
yield { "id" : 1 , "name" : "Alice" , "age" : 30 }
yield { "id" : 2 , "name" : "Bob" , "age" : 25 }
pipeline = dlt.pipeline(
destination = csv_destination,
dataset_name = "my_dataset"
)
pipeline.run(my_data())
Example 4: Multi-Destination
import dlt
from typing import List, Dict, Any
import requests
from pymongo import MongoClient
def multi_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
"""Load data to multiple destinations."""
# Load to MongoDB
mongo_client = MongoClient( "mongodb://localhost:27017/" )
db = mongo_client[schema[ "name" ]]
collection = db[table[ "name" ]]
if items:
collection.insert_many(items)
# Also send to webhook
webhook_url = "https://hooks.example.com/data"
requests.post(webhook_url, json = {
"table" : table[ "name" ],
"count" : len (items),
"items" : items
})
print ( f "Loaded { len (items) } items to MongoDB and webhook" )
@dlt.resource
def my_data ():
yield { "id" : 1 , "event" : "click" }
pipeline = dlt.pipeline(
destination = multi_destination,
dataset_name = "my_dataset"
)
pipeline.run(my_data())
Configuration
Adding Configuration Parameters
You can create a configurable destination using closures or classes:
import dlt
from typing import List, Dict, Any
def create_api_destination ( api_url : str , api_key : str ):
"""Factory function to create configured destination."""
def api_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
headers = { "Authorization" : f "Bearer { api_key } " }
for item in items:
response = requests.post(
f " { api_url } / { table[ 'name' ] } " ,
json = item,
headers = headers
)
response.raise_for_status()
print ( f "Loaded { len (items) } items" )
return api_destination
# Use the configured destination
pipeline = dlt.pipeline(
destination = create_api_destination(
api_url = "https://api.example.com" ,
api_key = "your-api-key"
),
dataset_name = "my_dataset"
)
Using Configuration Files
Load configuration from .dlt/secrets.toml:
import dlt
from dlt.common.configuration import with_config
from typing import List, Dict, Any
@with_config ( sections = ( "destination" , "custom_api" ))
def api_destination (
items : List[Dict[ str , Any]],
table ,
schema ,
api_url : str = dlt.config.value,
api_key : str = dlt.secrets.value
) -> None :
"""Destination with configuration from secrets.toml."""
headers = { "Authorization" : f "Bearer { api_key } " }
for item in items:
requests.post( f " { api_url } / { table[ 'name' ] } " , json = item, headers = headers)
pipeline = dlt.pipeline(
destination = api_destination,
dataset_name = "my_dataset"
)
In .dlt/secrets.toml:
[ destination . custom_api ]
api_url = "https://api.example.com"
api_key = "your-secret-key"
Handling Write Dispositions
Implement different write dispositions:
import dlt
from typing import List, Dict, Any
def smart_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
"""Handle different write dispositions."""
write_disposition = table.get( "write_disposition" , "append" )
table_name = table[ "name" ]
if write_disposition == "replace" :
# Delete all existing data
print ( f "Replacing all data in { table_name } " )
# your_db.delete_all(table_name)
# your_db.insert(table_name, items)
elif write_disposition == "append" :
# Just append new data
print ( f "Appending { len (items) } items to { table_name } " )
# your_db.insert(table_name, items)
elif write_disposition == "merge" :
# Update existing records, insert new ones
print ( f "Merging { len (items) } items into { table_name } " )
primary_key = table.get( "primary_key" , [])
# your_db.upsert(table_name, items, primary_key)
print ( f "Loaded { len (items) } items with { write_disposition } disposition" )
Error Handling
Implement robust error handling:
import dlt
from typing import List, Dict, Any
import logging
logger = logging.getLogger( __name__ )
def robust_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
"""Destination with error handling."""
successful = 0
failed = 0
for item in items:
try :
# Process item
# your_api.post(item)
successful += 1
except Exception as e:
logger.error( f "Failed to load item: { item } . Error: { e } " )
failed += 1
if failed > 0 :
logger.warning( f "Loaded { successful } items, { failed } failed" )
# Optionally raise exception if too many failures
if failed / len (items) > 0.1 : # More than 10% failed
raise Exception ( f "Too many failures: { failed } / { len (items) } " )
else :
print ( f "Successfully loaded all { successful } items" )
Batch Processing
dlt automatically batches your data. The items parameter contains a batch of rows:
import dlt
from typing import List, Dict, Any
def batched_destination (
items : List[Dict[ str , Any]],
table ,
schema ,
batch_size : int = 1000
) -> None :
"""Process items in smaller batches."""
for i in range ( 0 , len (items), batch_size):
batch = items[i:i + batch_size]
# Process batch
# your_api.bulk_insert(batch)
print ( f "Processed batch { i // batch_size + 1 } : { len (batch) } items" )
Configure batch size in the pipeline:
pipeline = dlt.pipeline(
destination = batched_destination,
dataset_name = "my_dataset"
)
# The destination receives items in batches
pipeline.run(my_data())
Use Bulk Operations : Load data in batches rather than one item at a time
Connection Pooling : Reuse database connections across batches
Async Processing : Use async operations for I/O-bound tasks
Parallel Loading : Process multiple tables in parallel
Compression : Compress data before transmission
Testing Custom Destinations
Test your destination thoroughly:
import dlt
from typing import List, Dict, Any
def test_destination ( items : List[Dict[ str , Any]], table , schema ) -> None :
"""Test destination that collects loaded data."""
if not hasattr (test_destination, "loaded_data" ):
test_destination.loaded_data = []
test_destination.loaded_data.extend(items)
print ( f "Test: loaded { len (items) } items" )
# Use in tests
@dlt.resource
def test_data ():
yield { "id" : 1 , "value" : "test" }
pipeline = dlt.pipeline(
destination = test_destination,
dataset_name = "test"
)
pipeline.run(test_data())
# Verify
assert len (test_destination.loaded_data) == 1
assert test_destination.loaded_data[ 0 ][ "id" ] == 1
Limitations
Custom destinations don’t support automatic schema migration
You’re responsible for handling data types and schema changes
State management must be implemented manually
No built-in retry mechanism (implement your own)
SQL client and related features are not available
When to Use Custom Destinations
Use custom destinations when:
Loading to proprietary or specialized systems
Implementing custom business logic during loading
Integrating with APIs that don’t have a dedicated destination
Sending data to multiple destinations simultaneously
Applying transformations that can’t be done in sources
Testing and development
Additional Resources
Built-in Destinations See all available built-in destinations
Destination Development Advanced destination development guide