Skip to main content
Load files from local or cloud storage including CSV, JSON, Parquet, and custom formats. The filesystem source works with S3, Google Cloud Storage, Azure Blob Storage, and local files.

Quick Start

Load CSV files from a directory:
import dlt
from dlt.sources.filesystem import readers

pipeline = dlt.pipeline(
    pipeline_name="csv_loader",
    destination="duckdb",
    dataset_name="file_data",
)

# Load CSV files with automatic schema detection
csv_files = readers(
    bucket_url="data/csv_files",
    file_glob="*.csv"
).read_csv()

load_info = pipeline.run(csv_files)
print(load_info)

Reading Different File Formats

from dlt.sources.filesystem import readers

csv_data = readers(
    bucket_url="data/csv",
    file_glob="*.csv"
).read_csv()

pipeline.run(csv_data)

Cloud Storage

1

AWS S3

from dlt.sources.filesystem import readers

# S3 bucket
s3_data = readers(
    bucket_url="s3://my-bucket/data/",
    file_glob="*.csv"
).read_csv()

pipeline.run(s3_data)
Configure credentials in .dlt/secrets.toml:
[sources.filesystem.credentials]
aws_access_key_id = "YOUR_ACCESS_KEY"
aws_secret_access_key = "YOUR_SECRET_KEY"
2

Google Cloud Storage

# GCS bucket
gcs_data = readers(
    bucket_url="gs://my-bucket/data/",
    file_glob="*.parquet"
).read_parquet()

pipeline.run(gcs_data)
Configure credentials:
[sources.filesystem.credentials]
project_id = "my-project"
client_email = "[email protected]"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
3

Azure Blob Storage

# Azure blob storage
azure_data = readers(
    bucket_url="az://my-container/data/",
    file_glob="*.jsonl"
).read_jsonl()

pipeline.run(azure_data)
Configure credentials:
[sources.filesystem.credentials]
azure_storage_account_name = "myaccount"
azure_storage_account_key = "YOUR_ACCOUNT_KEY"

Merge CSV Files

Load and merge CSV files based on a key column:
import dlt
from dlt.sources.filesystem import readers

pipeline = dlt.pipeline(
    pipeline_name="csv_merge",
    destination="duckdb",
    dataset_name="met_data",
)

# Load CSV files and merge on 'date' column
met_files = readers(
    bucket_url="samples",
    file_glob="met_csv/A801/*.csv"
).read_csv()

# Configure merge behavior
met_files.apply_hints(
    write_disposition="merge",
    merge_key="date"
)

load_info = pipeline.run(met_files.with_name("met_csv"))
print(load_info)

Incremental File Loading

Track which files have been loaded to avoid reprocessing:
import dlt
from dlt.sources.filesystem import filesystem, read_csv

pipeline = dlt.pipeline(
    pipeline_name="incremental_files",
    destination="duckdb",
    dataset_name="file_tracker",
)

# Track files by modification time
new_files = filesystem(
    bucket_url="data/csv",
    file_glob="csv/*"
)

# Add incremental on modification_date
new_files.apply_hints(
    incremental=dlt.sources.incremental("modification_date")
)

load_info = pipeline.run(
    (new_files | read_csv()).with_name("csv_files")
)
print(load_info)

# Second run - only new/modified files are loaded
load_info = pipeline.run(
    (new_files | read_csv()).with_name("csv_files")
)
print(load_info)  # No new files

Custom File Processing

Create custom transformers for specialized file formats:
import dlt
from typing import Iterator
from dlt.sources import TDataItems
from dlt.sources.filesystem import FileItemDict, filesystem

@dlt.transformer
def read_excel(
    items: Iterator[FileItemDict],
    sheet_name: str
) -> Iterator[TDataItems]:
    """Read Excel files using pandas"""
    import pandas as pd
    
    for file_obj in items:
        with file_obj.open() as file:
            yield pd.read_excel(file, sheet_name).to_dict(orient="records")

# Use the custom transformer
fresman_xls = filesystem(
    bucket_url="samples",
    file_glob="../custom/freshman_kgs.xlsx"
) | read_excel("freshman_table")

load_info = dlt.run(
    freshman_xls.with_name("freshman"),
    destination="duckdb",
    dataset_name="freshman_data",
)
print(load_info)

Copy Files While Loading

Download files locally while tracking them in the database:
import os
import dlt
from dlt.sources.filesystem import FileItemDict, filesystem

pipeline = dlt.pipeline(
    pipeline_name="file_copier",
    destination="duckdb",
    dataset_name="file_metadata",
)

def _copy(item: FileItemDict) -> FileItemDict:
    """Download file and return metadata"""
    dest_file = os.path.join("_storage", item["relative_path"])
    os.makedirs(os.path.dirname(dest_file), exist_ok=True)
    
    # Download file
    item.fsspec.download(item["file_url"], dest_file)
    
    return item

# Add copy step to filesystem source
downloader = filesystem(
    bucket_url="samples",
    file_glob="**"
).add_map(_copy)

# Load file metadata to 'listing' table
load_info = pipeline.run(
    downloader.with_name("listing"),
    write_disposition="replace"
)
print(load_info)

DuckDB Fast CSV Reading

Use DuckDB’s native CSV reader for maximum performance:
from dlt.sources.filesystem import readers

pipeline = dlt.pipeline(
    pipeline_name="fast_csv",
    destination="duckdb",
    dataset_name="csv_data",
)

# Use DuckDB's native CSV reader
csv_files = readers(
    bucket_url="samples",
    file_glob="met_csv/A801/*.csv"
).read_csv_duckdb(
    chunk_size=1000,
    header=True
)

load_info = pipeline.run(csv_files)
print(load_info)

Compressed Files

Automatically handle compressed files:
from dlt.sources.filesystem import readers

pipeline = dlt.pipeline(
    pipeline_name="compressed_files",
    destination="duckdb",
    dataset_name="taxi_data",
)

# Automatically decompress .gz files
compressed_files = readers(
    bucket_url="samples",
    file_glob="gzip/*"
).read_csv_duckdb()

load_info = pipeline.run(compressed_files)
print(load_info)

File Glob Patterns

Use glob patterns to filter files:
# Single directory
readers(bucket_url="data", file_glob="*.csv")

# Recursive search
readers(bucket_url="data", file_glob="**/*.json")

# Multiple patterns
readers(bucket_url="data", file_glob="2024-*/*.csv")

# Specific subdirectory
readers(bucket_url="data", file_glob="exports/2024/**/*.parquet")

Complete Example: Multi-Format Load

import dlt
from dlt.sources.filesystem import readers

def load_all_files():
    pipeline = dlt.pipeline(
        pipeline_name="multi_format",
        destination="duckdb",
        dataset_name="data_lake",
    )
    
    # Load JSONL files
    jsonl_data = readers(
        bucket_url="s3://my-bucket/data",
        file_glob="**/*.jsonl"
    ).read_jsonl(chunksize=10000)
    
    # Load Parquet files
    parquet_data = readers(
        bucket_url="s3://my-bucket/data",
        file_glob="**/*.parquet"
    ).read_parquet()
    
    # Load CSV files with merge
    csv_data = readers(
        bucket_url="s3://my-bucket/data",
        file_glob="**/*.csv"
    ).read_csv()
    csv_data.apply_hints(
        write_disposition="merge",
        merge_key="id"
    )
    
    # Load all together
    load_info = pipeline.run([
        jsonl_data.with_name("events"),
        parquet_data.with_name("analytics"),
        csv_data.with_name("reference_data"),
    ])
    
    print(load_info)

if __name__ == "__main__":
    load_all_files()

Next Steps

Incremental Loading

Track processed files automatically

Schema Evolution

Handle changing file schemas

Build docs developers (and LLMs) love