Skip to main content

Function Signature

def run(
    data: Any,
    *,
    destination: TDestinationReferenceArg = None,
    staging: TDestinationReferenceArg = None,
    dataset_name: str = None,
    table_name: str = None,
    write_disposition: TWriteDispositionConfig = None,
    columns: Sequence[TColumnSchema] = None,
    schema: Schema = None,
    loader_file_format: TLoaderFileFormat = None,
    table_format: TTableFormat = None,
    schema_contract: TSchemaContract = None,
    refresh: TRefreshMode = None,
) -> LoadInfo

Description

Loads data from the data argument into the destination specified in destination and dataset specified in dataset_name. This is a convenience function that creates or retrieves the current pipeline and runs it. The method extracts data from the data argument, infers the schema, normalizes the data into a load package (i.e., jsonl or PARQUET files representing tables), and then loads such packages into the destination.

Execution Flow

  1. The run method first uses sync_destination to synchronize pipeline state and schemas with the destination (can be disabled with restore_from_destination configuration option)
  2. Ensures data from previous runs is fully processed. If not, normalizes and loads pending data items
  3. Extracts, normalizes, and loads new data from the data argument

Parameters

data
Any
required
Data to be loaded to the destination. Can be supplied in several forms:
  • A list or Iterable of any JSON-serializable objects: dlt.run([1, 2, 3], table_name="numbers")
  • Any Iterator or a function that yields (Generator): dlt.run(range(1, 10), table_name="range")
  • A function or list of functions decorated with @dlt.resource: dlt.run([chess_players(title="GM"), chess_games()])
  • A function or list of functions decorated with @dlt.source
Note: dlt handles bytes, datetime, decimal, and uuid objects, so you can load binary data or documents containing dates.
destination
TDestinationReferenceArg
default:"None"
A name of the destination to which dlt will load the data, or a destination module imported from dlt.destination. If not provided, the value passed to dlt.pipeline will be used.
staging
TDestinationReferenceArg
default:"None"
A name of the destination where dlt will stage the data before final loading, or a destination module imported from dlt.destination.
dataset_name
str
default:"None"
A name of the dataset to which the data will be loaded. A dataset is a logical group of tables (e.g., schema in relational databases or folder grouping many files). If not provided, the value passed to dlt.pipeline will be used. If not provided at all, defaults to the pipeline_name.
table_name
str
default:"None"
The name of the table to which the data should be loaded within the dataset. This argument is required for data that is a list/Iterable or Iterator without a __name__ attribute.The behavior depends on the type of data:
  • Generator functions: The function name is used as table name; table_name overrides this default
  • @dlt.resource: Resource contains the full table schema including the table name; table_name will override this property (use with care!)
  • @dlt.source: Source contains several resources each with a table schema; table_name will override all table names within the source and load data into a single table
write_disposition
TWriteDispositionConfig
default:"None"
Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary.Allowed shorthand string literals:
  • append: Always add new data at the end of the table (default)
  • replace: Replace existing data with new data
  • skip: Prevent data from loading
  • merge: Deduplicate and merge data based on “primary_key” and “merge_key” hints
Write behavior can be further customized through a configuration dictionary. For example, to obtain an SCD2 table: write_disposition={"disposition": "merge", "strategy": "scd2"}Note: For dlt.resource, the table schema value will be overwritten. For dlt.source, values in all resources will be overwritten.
columns
Sequence[TColumnSchema]
default:"None"
A list of column schemas. Typed dictionary describing column names, data types, write disposition, and performance hints that gives you full control over the created table schema.
schema
Schema
default:"None"
An explicit Schema object in which all table schemas will be grouped. By default, dlt takes the schema from the source (if passed in data argument) or creates a default one itself.
loader_file_format
TLoaderFileFormat
default:"None"
The file format the loader will use to create the load package. Not all file formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.Common formats: jsonl, parquet, insert_values, csv
table_format
TTableFormat
default:"None"
The table format used by the destination to store tables. Can be delta, iceberg, hive, or native. Currently you can select table format on filesystem and Athena destinations.
schema_contract
TSchemaContract
default:"None"
An override for the schema contract settings. This will replace the schema contract settings for all tables in the schema. Controls schema evolution behavior such as allowing new tables, new columns, or data type changes.
refresh
TRefreshMode
default:"None"
Fully or partially reset sources before loading new data in this run.The following refresh modes are supported:
  • drop_sources: Drop tables and source and resource state for all sources currently being processed (Note: schema history is erased)
  • drop_resources: Drop tables and resource state for all resources being processed. Source level state is not modified (Note: schema history is erased)
  • drop_data: Wipe all data and resource state for all resources being processed. Schema is not modified

Returns

load_info
LoadInfo
Information on loaded data including:
  • List of package ids (loads_ids)
  • Failed job statuses (has_failed_jobs, load_packages)
  • Destination and dataset information
  • Timing and metrics
Note: dlt will not raise an exception if a single job terminally fails. Such information is provided via LoadInfo.

Raises

PipelineStepFailed
Exception
Raised when a problem occurs during extract, normalize, or load steps.

Examples

Load a simple list

import dlt

# Load a list of numbers
load_info = dlt.run(
    [1, 2, 3],
    destination="duckdb",
    dataset_name="my_dataset",
    table_name="numbers"
)
print(load_info)

Load from a generator function

import dlt

def generate_rows():
    for i in range(100):
        yield {"id": i, "value": i * 10}

load_info = dlt.run(
    generate_rows(),
    destination="postgres",
    dataset_name="analytics",
    table_name="generated_data"
)

Load with merge write disposition

import dlt

data = [
    {"id": 1, "name": "Alice", "score": 100},
    {"id": 2, "name": "Bob", "score": 200},
]

load_info = dlt.run(
    data,
    destination="bigquery",
    dataset_name="game_data",
    table_name="players",
    write_disposition="merge",
    columns=[
        {"name": "id", "data_type": "bigint", "primary_key": True},
        {"name": "name", "data_type": "text"},
        {"name": "score", "data_type": "bigint"},
    ]
)

Load a dlt resource

import dlt
from dlt.sources.helpers import requests

@dlt.resource(write_disposition="append")
def api_data():
    response = requests.get("https://api.example.com/data")
    yield response.json()

load_info = dlt.run(
    api_data(),
    destination="snowflake",
    dataset_name="api_integration"
)

Load with refresh to drop existing data

import dlt

load_info = dlt.run(
    my_source(),
    destination="redshift",
    dataset_name="clean_data",
    refresh="drop_sources"  # Drop all tables and state before loading
)

Load to Parquet files with Iceberg format

import dlt

load_info = dlt.run(
    my_data,
    destination="filesystem",
    dataset_name="data_lake",
    table_name="events",
    loader_file_format="parquet",
    table_format="iceberg"
)

LoadInfo Methods

The returned LoadInfo object provides several useful methods:
load_info = dlt.run(...)

# Check for failed jobs
if load_info.has_failed_jobs:
    print("Some jobs failed!")

# Raise exception if any jobs failed
load_info.raise_on_failed_jobs()

# Get detailed information
print(load_info.asstr(verbosity=1))

# Convert to dictionary
info_dict = load_info.asdict()

See Also

  • pipeline - Create a pipeline instance for more control
  • Pipeline.run - The Pipeline instance method version

Build docs developers (and LLMs) love