Skip to main content
In this quickstart, you’ll build a complete data pipeline that extracts data from the Chess.com API and loads it into a local DuckDB database. By the end, you’ll understand the core concepts and be ready to build your own pipelines.
This tutorial takes approximately 5-10 minutes to complete. You’ll create a working pipeline, run it, and query the results.

What You’ll Build

You’ll create a pipeline that:
  • Fetches player data from the Chess.com public API
  • Automatically infers the schema from the JSON response
  • Loads the data into DuckDB (a fast, embedded SQL database)
  • Can be queried immediately using SQL

Prerequisites

  • Python 3.9 or higher installed on your system
  • Basic familiarity with Python
  • A terminal or command prompt
1

Install dlt

First, install dlt using pip. We’ll also install DuckDB support, which is our destination database.
pip install dlt[duckdb]
The [duckdb] extra installs the DuckDB adapter. dlt supports 20+ destinations including PostgreSQL, BigQuery, Snowflake, and more. Check the destinations documentation for the full list.
2

Create Your Pipeline Script

Create a new Python file called chess_pipeline.py and add the following code:
chess_pipeline.py
import dlt
from dlt.sources.helpers import requests

# Create a dlt pipeline that will load
# chess player data to the DuckDB destination
pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)

# Grab some player data from Chess.com API
data = []
for player in ['magnuscarlsen', 'rpragchess']:
    response = requests.get(f'https://api.chess.com/pub/player/{player}')
    response.raise_for_status()
    data.append(response.json())

# Extract, normalize, and load the data
info = pipeline.run(data, table_name='player')

print(info)
Let’s break down what this code does:
  • Create a pipeline: The dlt.pipeline() function creates a pipeline with a name, destination (DuckDB), and dataset name
  • Fetch data: We loop through player usernames and fetch their data from the Chess.com API
  • Load data: The pipeline.run() method automatically infers the schema, normalizes the JSON data, and loads it into the player table
  • Print info: The run info contains metadata about what was loaded
3

Run Your Pipeline

Execute your pipeline script:
python chess_pipeline.py
You should see output indicating the pipeline ran successfully, including information about the loaded data:
Pipeline chess_pipeline completed in 0.5s
1 load package(s) were loaded to destination duckdb and into dataset player_data
The duckdb destination used duckdb:////home/user/chess_pipeline.duckdb location to store data
Load package 1234567890.0 is LOADED and contains no failed jobs
Success! Your pipeline has loaded data into DuckDB. The database file chess_pipeline.duckdb was created in your current directory.
4

Query Your Data

Now let’s query the data you just loaded. Create a new file called query_data.py:
query_data.py
import dlt

# Connect to the same pipeline
pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)

# Query the data using SQL
with pipeline.sql_client() as client:
    # Execute a query
    with client.execute_query('SELECT * FROM player') as cursor:
        # Fetch all rows
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        
        # Print results
        print(f"Found {len(rows)} players:")
        print("\nColumns:", columns)
        for row in rows:
            print(dict(zip(columns, row)))
Run the query script:
python query_data.py
You’ll see the player data you loaded, including usernames, titles, followers, and more.
You can also use pandas to work with your data:
import dlt

pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='duckdb',
    dataset_name='player_data'
)

# Get data as a pandas DataFrame
df = pipeline.dataset().player.df()
print(df)
5

Understand What Happened

When you ran your pipeline, dlt automatically:
  1. Inferred the schema - Examined the JSON structure and determined table columns and data types
  2. Created tables - Set up the player table in DuckDB with the appropriate schema
  3. Normalized data - Converted the nested JSON into relational tables (if there were nested structures, dlt would create child tables)
  4. Loaded data - Inserted the player records into the database
  5. Tracked state - Saved pipeline metadata for incremental loading in future runs
All of this happened with just one function call: pipeline.run()!

Next Steps

Congratulations! You’ve built your first dlt pipeline. Here’s what you can explore next:

Core Concepts

Learn about pipelines, sources, resources, and destinations in depth

Incremental Loading

Load only new or changed data instead of full refreshes

Destinations

Explore 20+ supported destinations like BigQuery, Snowflake, and PostgreSQL

Verified Sources

Use pre-built sources for popular APIs and services

Common Patterns

Loading Different Data Types

dlt can load various data types beyond API responses:
import dlt

pipeline = dlt.pipeline(
    destination='duckdb',
    dataset_name='mydata'
)

# Load a list of dictionaries
data = [
    {'id': 1, 'name': 'Alice', 'email': '[email protected]'},
    {'id': 2, 'name': 'Bob', 'email': '[email protected]'}
]
pipeline.run(data, table_name='users')

# Load a pandas DataFrame
import pandas as pd
df = pd.DataFrame({'id': [1, 2], 'value': [100, 200]})
pipeline.run(df, table_name='metrics')

# Load from a generator function
def generate_data():
    for i in range(1000):
        yield {'id': i, 'value': i * 10}

pipeline.run(generate_data(), table_name='generated')

Using Different Destinations

Switch destinations by changing the destination parameter:
# Load to PostgreSQL
pipeline = dlt.pipeline(
    destination='postgres',
    dataset_name='mydata',
    credentials='postgresql://user:password@localhost:5432/mydb'
)

# Load to BigQuery
pipeline = dlt.pipeline(
    destination='bigquery',
    dataset_name='mydata',
    credentials='path/to/credentials.json'
)

# Load to Snowflake
pipeline = dlt.pipeline(
    destination='snowflake',
    dataset_name='mydata'
)

Incremental Loading

Load only new data on subsequent runs:
import dlt

@dlt.resource(
    table_name='events',
    write_disposition='append',
    primary_key='event_id'
)
def get_events(updated_at=dlt.sources.incremental('updated_at')):
    # This will automatically track the last 'updated_at' value
    response = requests.get(
        'https://api.example.com/events',
        params={'since': updated_at.last_value}
    )
    yield response.json()

pipeline = dlt.pipeline(
    destination='duckdb',
    dataset_name='events_data'
)

# First run loads all data
# Subsequent runs only load new events
pipeline.run(get_events())

Troubleshooting

Make sure you’ve installed dlt in your current Python environment:
pip install dlt[duckdb]
If using a virtual environment, ensure it’s activated.
The Chess.com API is public and doesn’t require authentication. If you get connection errors:
If you get a “database is locked” error, make sure:
  • You’ve closed any previous connections to the database
  • No other process is accessing the .duckdb file
  • You’re using with pipeline.sql_client() as client: to ensure connections are properly closed
If dlt infers the wrong data types:
  • Provide explicit hints using the columns parameter
  • Define a custom schema
  • See the schema documentation for details

Learn More

Join Slack Community

Get help from thousands of dlt users and the core team

Browse Examples

Explore real-world pipeline examples and use cases

Full Documentation

Deep dive into all dlt features and capabilities

GitHub Repository

View source code, report issues, and contribute

Build docs developers (and LLMs) love