Skip to main content

Custom Destinations

While dlt supports many destinations out of the box, you can create custom destination adapters to load data into any system. This is useful for proprietary systems, specialized databases, or when you need custom data processing logic.

Overview

A custom destination is a Python function or callable that receives data from dlt and processes it however you want. You can:
  • Load data into any database or API
  • Transform data before storing
  • Send data to multiple destinations
  • Implement custom business logic
  • Integrate with proprietary systems

Quick Start

Here’s a simple custom destination:
import dlt
from dlt.common.destination import Destination
from dlt.common.schema import TTableSchemaColumns

def my_custom_destination(items, table, schema):
    """A simple custom destination that prints data."""
    print(f"Loading {len(items)} items into table {table['name']}")
    for item in items:
        print(item)

# Use the custom destination
@dlt.resource
def my_data():
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

pipeline = dlt.pipeline(
    pipeline_name="custom_pipeline",
    destination=my_custom_destination,
    dataset_name="my_dataset"
)

pipeline.run(my_data())

Destination Function Signature

A custom destination function receives the following parameters:
def my_destination(
    items: List[Dict[str, Any]],
    table: Dict[str, Any],
    schema: Dict[str, Any]
) -> None:
    """
    Args:
        items: List of data items (rows) to load
        table: Table schema information
        schema: Complete schema information
    """
    pass

Parameters

items
List[Dict[str, Any]]
A list of data items (dictionaries) to be loaded. Each item represents one row.
table
Dict[str, Any]
Table schema information including:
  • name: Table name
  • columns: Column definitions
  • write_disposition: How to write data (append, replace, merge)
  • primary_key: Primary key columns (if any)
schema
Dict[str, Any]
Complete schema information for the entire dataset

Examples

Example 1: Load to REST API

import dlt
import requests
from typing import List, Dict, Any

def api_destination(items: List[Dict[str, Any]], table, schema) -> None:
    """Load data to a REST API."""
    api_url = "https://api.example.com/data"
    headers = {"Authorization": "Bearer YOUR_TOKEN"}
    
    for item in items:
        response = requests.post(
            api_url,
            json=item,
            headers=headers
        )
        response.raise_for_status()
    
    print(f"Loaded {len(items)} items to {table['name']}")

@dlt.resource
def my_data():
    yield {"id": 1, "value": "data"}

pipeline = dlt.pipeline(
    destination=api_destination,
    dataset_name="my_dataset"
)

pipeline.run(my_data())

Example 2: Load to MongoDB

import dlt
from pymongo import MongoClient
from typing import List, Dict, Any

def mongodb_destination(items: List[Dict[str, Any]], table, schema) -> None:
    """Load data to MongoDB."""
    client = MongoClient("mongodb://localhost:27017/")
    db = client[schema["name"]]
    collection = db[table["name"]]
    
    # Handle write disposition
    if table.get("write_disposition") == "replace":
        collection.delete_many({})
    
    if items:
        collection.insert_many(items)
    
    print(f"Loaded {len(items)} items to {table['name']}")

@dlt.resource
def my_data():
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

pipeline = dlt.pipeline(
    destination=mongodb_destination,
    dataset_name="my_dataset"
)

pipeline.run(my_data())

Example 3: Custom CSV Writer

import dlt
import csv
from pathlib import Path
from typing import List, Dict, Any

def csv_destination(items: List[Dict[str, Any]], table, schema) -> None:
    """Write data to CSV files."""
    output_dir = Path("./output")
    output_dir.mkdir(exist_ok=True)
    
    file_path = output_dir / f"{table['name']}.csv"
    
    if not items:
        return
    
    # Determine mode based on write disposition
    mode = "w" if table.get("write_disposition") == "replace" else "a"
    file_exists = file_path.exists()
    
    with open(file_path, mode, newline="") as f:
        writer = csv.DictWriter(f, fieldnames=items[0].keys())
        
        # Write header if new file
        if mode == "w" or not file_exists:
            writer.writeheader()
        
        writer.writerows(items)
    
    print(f"Wrote {len(items)} items to {file_path}")

@dlt.resource
def my_data():
    yield {"id": 1, "name": "Alice", "age": 30}
    yield {"id": 2, "name": "Bob", "age": 25}

pipeline = dlt.pipeline(
    destination=csv_destination,
    dataset_name="my_dataset"
)

pipeline.run(my_data())

Example 4: Multi-Destination

import dlt
from typing import List, Dict, Any
import requests
from pymongo import MongoClient

def multi_destination(items: List[Dict[str, Any]], table, schema) -> None:
    """Load data to multiple destinations."""
    
    # Load to MongoDB
    mongo_client = MongoClient("mongodb://localhost:27017/")
    db = mongo_client[schema["name"]]
    collection = db[table["name"]]
    if items:
        collection.insert_many(items)
    
    # Also send to webhook
    webhook_url = "https://hooks.example.com/data"
    requests.post(webhook_url, json={
        "table": table["name"],
        "count": len(items),
        "items": items
    })
    
    print(f"Loaded {len(items)} items to MongoDB and webhook")

@dlt.resource
def my_data():
    yield {"id": 1, "event": "click"}

pipeline = dlt.pipeline(
    destination=multi_destination,
    dataset_name="my_dataset"
)

pipeline.run(my_data())

Configuration

Adding Configuration Parameters

You can create a configurable destination using closures or classes:
import dlt
from typing import List, Dict, Any

def create_api_destination(api_url: str, api_key: str):
    """Factory function to create configured destination."""
    
    def api_destination(items: List[Dict[str, Any]], table, schema) -> None:
        headers = {"Authorization": f"Bearer {api_key}"}
        
        for item in items:
            response = requests.post(
                f"{api_url}/{table['name']}",
                json=item,
                headers=headers
            )
            response.raise_for_status()
        
        print(f"Loaded {len(items)} items")
    
    return api_destination

# Use the configured destination
pipeline = dlt.pipeline(
    destination=create_api_destination(
        api_url="https://api.example.com",
        api_key="your-api-key"
    ),
    dataset_name="my_dataset"
)

Using Configuration Files

Load configuration from .dlt/secrets.toml:
import dlt
from dlt.common.configuration import with_config
from typing import List, Dict, Any

@with_config(sections=("destination", "custom_api"))
def api_destination(
    items: List[Dict[str, Any]],
    table,
    schema,
    api_url: str = dlt.config.value,
    api_key: str = dlt.secrets.value
) -> None:
    """Destination with configuration from secrets.toml."""
    headers = {"Authorization": f"Bearer {api_key}"}
    
    for item in items:
        requests.post(f"{api_url}/{table['name']}", json=item, headers=headers)

pipeline = dlt.pipeline(
    destination=api_destination,
    dataset_name="my_dataset"
)
In .dlt/secrets.toml:
[destination.custom_api]
api_url = "https://api.example.com"
api_key = "your-secret-key"

Handling Write Dispositions

Implement different write dispositions:
import dlt
from typing import List, Dict, Any

def smart_destination(items: List[Dict[str, Any]], table, schema) -> None:
    """Handle different write dispositions."""
    write_disposition = table.get("write_disposition", "append")
    table_name = table["name"]
    
    if write_disposition == "replace":
        # Delete all existing data
        print(f"Replacing all data in {table_name}")
        # your_db.delete_all(table_name)
        # your_db.insert(table_name, items)
    
    elif write_disposition == "append":
        # Just append new data
        print(f"Appending {len(items)} items to {table_name}")
        # your_db.insert(table_name, items)
    
    elif write_disposition == "merge":
        # Update existing records, insert new ones
        print(f"Merging {len(items)} items into {table_name}")
        primary_key = table.get("primary_key", [])
        # your_db.upsert(table_name, items, primary_key)
    
    print(f"Loaded {len(items)} items with {write_disposition} disposition")

Error Handling

Implement robust error handling:
import dlt
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

def robust_destination(items: List[Dict[str, Any]], table, schema) -> None:
    """Destination with error handling."""
    successful = 0
    failed = 0
    
    for item in items:
        try:
            # Process item
            # your_api.post(item)
            successful += 1
        except Exception as e:
            logger.error(f"Failed to load item: {item}. Error: {e}")
            failed += 1
    
    if failed > 0:
        logger.warning(f"Loaded {successful} items, {failed} failed")
        # Optionally raise exception if too many failures
        if failed / len(items) > 0.1:  # More than 10% failed
            raise Exception(f"Too many failures: {failed}/{len(items)}")
    else:
        print(f"Successfully loaded all {successful} items")

Batch Processing

dlt automatically batches your data. The items parameter contains a batch of rows:
import dlt
from typing import List, Dict, Any

def batched_destination(
    items: List[Dict[str, Any]],
    table,
    schema,
    batch_size: int = 1000
) -> None:
    """Process items in smaller batches."""
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Process batch
        # your_api.bulk_insert(batch)
        
        print(f"Processed batch {i//batch_size + 1}: {len(batch)} items")
Configure batch size in the pipeline:
pipeline = dlt.pipeline(
    destination=batched_destination,
    dataset_name="my_dataset"
)

# The destination receives items in batches
pipeline.run(my_data())

Performance Tips

  1. Use Bulk Operations: Load data in batches rather than one item at a time
  2. Connection Pooling: Reuse database connections across batches
  3. Async Processing: Use async operations for I/O-bound tasks
  4. Parallel Loading: Process multiple tables in parallel
  5. Compression: Compress data before transmission

Testing Custom Destinations

Test your destination thoroughly:
import dlt
from typing import List, Dict, Any

def test_destination(items: List[Dict[str, Any]], table, schema) -> None:
    """Test destination that collects loaded data."""
    if not hasattr(test_destination, "loaded_data"):
        test_destination.loaded_data = []
    
    test_destination.loaded_data.extend(items)
    print(f"Test: loaded {len(items)} items")

# Use in tests
@dlt.resource
def test_data():
    yield {"id": 1, "value": "test"}

pipeline = dlt.pipeline(
    destination=test_destination,
    dataset_name="test"
)

pipeline.run(test_data())

# Verify
assert len(test_destination.loaded_data) == 1
assert test_destination.loaded_data[0]["id"] == 1

Limitations

  • Custom destinations don’t support automatic schema migration
  • You’re responsible for handling data types and schema changes
  • State management must be implemented manually
  • No built-in retry mechanism (implement your own)
  • SQL client and related features are not available

When to Use Custom Destinations

Use custom destinations when:
  • Loading to proprietary or specialized systems
  • Implementing custom business logic during loading
  • Integrating with APIs that don’t have a dedicated destination
  • Sending data to multiple destinations simultaneously
  • Applying transformations that can’t be done in sources
  • Testing and development

Additional Resources

Built-in Destinations

See all available built-in destinations

Destination Development

Advanced destination development guide

Build docs developers (and LLMs) love