Quick Start
Load a simple list of dictionaries:import dlt
data = [
{"id": 1, "name": "Alice", "email": "[email protected]"},
{"id": 2, "name": "Bob", "email": "[email protected]"},
]
pipeline = dlt.pipeline(
pipeline_name="simple_load",
destination="duckdb",
dataset_name="users",
)
load_info = pipeline.run(data, table_name="users")
print(load_info)
Load from API
Fetch and load data from any HTTP API:import dlt
from dlt.sources.helpers import requests
# Fetch data from API
response = requests.get("https://api.chess.com/pub/player/magnuscarlsen")
response.raise_for_status()
pipeline = dlt.pipeline(
pipeline_name="chess_api",
destination="duckdb",
dataset_name="chess_data",
)
load_info = pipeline.run(
response.json(),
table_name="players"
)
print(load_info)
Load Multiple Records
Fetch and load multiple records from an API:import dlt
from dlt.sources.helpers import requests
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="player_data",
)
# Fetch data for multiple players
data = []
for player in ["magnuscarlsen", "rpragchess", "hikaru"]:
response = requests.get(f"https://api.chess.com/pub/player/{player}")
response.raise_for_status()
data.append(response.json())
load_info = pipeline.run(data, table_name="players")
print(load_info)
Load from Pandas DataFrame
import dlt
import pandas as pd
# Read CSV into DataFrame
csv_url = (
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
)
df = pd.read_csv(csv_url)
pipeline = dlt.pipeline(
pipeline_name="from_csv",
destination="duckdb",
dataset_name="disasters",
)
load_info = pipeline.run(df, table_name="natural_disasters")
print(load_info)
Load from SQL Database
Load data directly using SQLAlchemy:import dlt
import sqlalchemy as sa
engine = sa.create_engine(
"mysql+pymysql://[email protected]:4497/Rfam"
)
with engine.connect() as conn:
# Stream data in batches
query = "SELECT * FROM genome LIMIT 1000"
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)
pipeline = dlt.pipeline(
pipeline_name="from_database",
destination="duckdb",
dataset_name="genome_data",
)
# Convert rows to dictionaries
data = map(lambda row: dict(row._mapping), rows)
load_info = pipeline.run(data, table_name="genome")
print(load_info)
Using Resources
Create reusable data resources:import dlt
from dlt.sources.helpers import requests
@dlt.resource(write_disposition="replace")
def github_issues(repository="dlt-hub/dlt"):
"""Fetch GitHub issues"""
url = f"https://api.github.com/repos/{repository}/issues"
response = requests.get(url, params={"state": "open", "per_page": 100})
response.raise_for_status()
yield response.json()
# Use the resource
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data",
)
load_info = pipeline.run(github_issues())
print(load_info)
Generator Functions
Stream data efficiently using generators:import dlt
from dlt.sources.helpers import requests
@dlt.resource
def paginated_api(base_url: str, endpoint: str):
"""Fetch paginated data from an API"""
page = 1
while True:
response = requests.get(
f"{base_url}/{endpoint}",
params={"page": page, "per_page": 100}
)
response.raise_for_status()
data = response.json()
if not data:
break
yield data
page += 1
pipeline = dlt.pipeline(
pipeline_name="paginated",
destination="duckdb",
dataset_name="api_data",
)
load_info = pipeline.run(
paginated_api(
base_url="https://api.example.com",
endpoint="users"
)
)
print(load_info)
Using Sources
Group multiple resources into a source:import dlt
from dlt.sources.helpers import requests
from typing import Iterator
@dlt.source
def github_source(repository: str = "dlt-hub/dlt"):
"""GitHub source with multiple resources"""
@dlt.resource(primary_key="id")
def issues():
url = f"https://api.github.com/repos/{repository}/issues"
response = requests.get(url)
response.raise_for_status()
yield response.json()
@dlt.resource(primary_key="id")
def pull_requests():
url = f"https://api.github.com/repos/{repository}/pulls"
response = requests.get(url)
response.raise_for_status()
yield response.json()
return issues, pull_requests
# Load all resources from source
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data",
)
load_info = pipeline.run(github_source())
print(load_info)
Nested Data
dlt automatically handles nested structures:import dlt
data = [
{
"id": 1,
"organization": "Tech Innovations Inc.",
"address": {
"building": "r&d",
"room": 7890,
},
"inventory": [
{"name": "Plasma ray", "inventory_nr": 2411},
{"name": "Self-aware Roomba", "inventory_nr": 268},
],
}
]
pipeline = dlt.pipeline(
pipeline_name="nested_data",
destination="duckdb",
dataset_name="organizations",
)
load_info = pipeline.run(data, table_name="org")
print(load_info)
# Creates tables: org, org__inventory
Using REST Client Helpers
Simplify API pagination with built-in helpers:import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
@dlt.resource(write_disposition="replace")
def github_issues(api_token=dlt.secrets.value):
"""Paginate through GitHub issues automatically"""
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
auth = BearerTokenAuth(api_token) if api_token else None
for page in paginate(
url,
auth=auth,
paginator=HeaderLinkPaginator(),
params={"state": "open", "per_page": 100}
):
yield page
pipeline = dlt.pipeline(
pipeline_name="github_paginated",
destination="duckdb",
dataset_name="github_data",
)
load_info = pipeline.run(github_issues())
print(load_info)
Arrow/Parquet Data
Load PyArrow tables directly:import dlt
import pyarrow as pa
import pyarrow.parquet as pq
# Read Parquet file as Arrow table
table = pq.read_table("data/users.parquet")
pipeline = dlt.pipeline(
pipeline_name="arrow_load",
destination="duckdb",
dataset_name="user_data",
)
load_info = pipeline.run(
table,
table_name="users",
loader_file_format="parquet"
)
print(load_info)
Complete Example: Multi-Source Pipeline
import dlt
import pandas as pd
from dlt.sources.helpers import requests
def load_all_data():
pipeline = dlt.pipeline(
pipeline_name="multi_source",
destination="duckdb",
dataset_name="analytics",
)
# Load from API
api_response = requests.get(
"https://api.chess.com/pub/player/magnuscarlsen"
)
api_data = api_response.json()
# Load from Pandas
df = pd.read_csv("data/users.csv")
# Load from Python list
events = [
{"event": "login", "user_id": 1, "timestamp": "2024-01-01"},
{"event": "logout", "user_id": 1, "timestamp": "2024-01-01"},
]
# Load all at once
load_info = pipeline.run(
[
dlt.resource(api_data, name="chess_players"),
dlt.resource(df, name="users"),
dlt.resource(events, name="events"),
]
)
print(load_info)
if __name__ == "__main__":
load_all_data()
Next Steps
Resources & Sources
Learn about resources and sources
Incremental Loading
Add incremental loading to your pipelines