Skip to main content

Filesystem Source

The filesystem source reads files from various storage locations including cloud storage (AWS S3, Google Cloud Storage, Azure Blob Storage) and local filesystems. It provides built-in readers for common file formats and supports custom file processing.

Quick Start

Load CSV files from an S3 bucket:
import dlt
from dlt.sources.filesystem import readers

# Load CSV files from S3
csv_reader = readers(
    bucket_url="s3://my-bucket/data/",
    file_glob="*.csv",
).read_csv()

pipeline = dlt.pipeline(
    pipeline_name="s3_csv_pipeline",
    destination="duckdb",
    dataset_name="s3_data"
)

load_info = pipeline.run(csv_reader)
print(load_info)

Supported Storage Locations

AWS S3

bucket_url="s3://bucket-name/path/"

Google Cloud Storage

bucket_url="gs://bucket-name/path/"

Azure Blob Storage

bucket_url="az://container-name/path/"

Local Filesystem

bucket_url="file:///local/path/"

SFTP

bucket_url="sftp://user@host/path/"

Google Drive

bucket_url="gdrive://folder-id/"

Authentication

Configure credentials for cloud storage providers:
from dlt.sources.credentials import AwsCredentials

credentials = AwsCredentials(
    aws_access_key_id=dlt.secrets["aws_access_key_id"],
    aws_secret_access_key=dlt.secrets["aws_secret_access_key"],
)

csv_reader = readers(
    bucket_url="s3://my-bucket/data/",
    credentials=credentials,
).read_csv()
Or use environment variables:
export AWS_ACCESS_KEY_ID="your_access_key"
export AWS_SECRET_ACCESS_KEY="your_secret_key"

Built-in File Readers

The readers source provides built-in support for common file formats:

CSV Files

from dlt.sources.filesystem import readers

# Read CSV files with default settings
csv_data = readers(
    bucket_url="s3://my-bucket/",
    file_glob="**/*.csv",  # Recursive pattern
).read_csv()

# Customize CSV reading with pandas kwargs
csv_data = readers(
    bucket_url="s3://my-bucket/",
    file_glob="*.csv",
).read_csv(
    chunksize=10000,
    sep=";",
    encoding="utf-8",
    dtype={"id": str},
)

pipeline.run(csv_data)

Parquet Files

# Read Parquet files
parquet_data = readers(
    bucket_url="s3://my-bucket/",
    file_glob="**/*.parquet",
).read_parquet(chunksize=50000)

pipeline.run(parquet_data)

JSONL Files

# Read JSON Lines files
jsonl_data = readers(
    bucket_url="s3://my-bucket/",
    file_glob="**/*.jsonl",
).read_jsonl(chunksize=10000)

pipeline.run(jsonl_data)

DuckDB CSV Reader

For faster CSV processing, use the DuckDB reader:
# Use DuckDB for fast CSV reading
csv_duckdb = readers(
    bucket_url="s3://my-bucket/",
    file_glob="*.csv",
).read_csv_duckdb(
    chunksize=100000,
    # Pass DuckDB-specific options
    header=True,
    delim=",",
)

pipeline.run(csv_duckdb)

File Filtering

Use glob patterns to filter files:
# Match all CSV files in all subdirectories
file_glob="**/*.csv"

Using the Filesystem Resource Directly

For custom file processing, use the filesystem resource:
from dlt.sources.filesystem import filesystem

@dlt.resource
def process_files():
    """Custom file processing logic"""
    files = filesystem(
        bucket_url="s3://my-bucket/data/",
        file_glob="*.txt",
    )

    for file_batch in files:
        for file_item in file_batch:
            # Read file content
            content = file_item.read_bytes()

            # Process content
            processed_data = custom_parser(content)

            yield processed_data

pipeline.run(process_files())

File Metadata

Access file metadata from FileItem objects:
from dlt.sources.filesystem import filesystem

@dlt.resource
def file_metadata():
    """Extract file metadata"""
    files = filesystem(
        bucket_url="s3://my-bucket/",
        file_glob="**/*",
    )

    for file_batch in files:
        for file_item in file_batch:
            yield {
                "file_url": file_item["file_url"],
                "file_name": file_item["file_name"],
                "mime_type": file_item["mime_type"],
                "size_in_bytes": file_item["size_in_bytes"],
                "modification_date": file_item["modification_date"],
            }

pipeline.run(file_metadata())

Incremental Loading

Load only new or modified files:
from dlt.sources.filesystem import readers

# Load files incrementally based on modification date
csv_data = readers(
    bucket_url="s3://my-bucket/data/",
    file_glob="**/*.csv",
    incremental=dlt.sources.incremental(
        cursor_path="modification_date",
        initial_value="2024-01-01T00:00:00Z"
    ),
).read_csv()

pipeline.run(csv_data)
On subsequent runs, only files modified after the last run will be processed.

Complete Example: S3 to DuckDB

A comprehensive example loading multiple file formats from S3:
import dlt
from dlt.sources.filesystem import readers, filesystem
from dlt.sources.credentials import AwsCredentials

@dlt.source
def s3_data_source():
    """Load multiple file formats from S3"""

    # AWS credentials from secrets
    credentials = AwsCredentials(
        aws_access_key_id=dlt.secrets["aws_access_key_id"],
        aws_secret_access_key=dlt.secrets["aws_secret_access_key"],
    )

    # Load CSV files with incremental loading
    yield readers(
        bucket_url="s3://my-bucket/csv/",
        file_glob="**/*.csv",
        credentials=credentials,
        incremental=dlt.sources.incremental(
            cursor_path="modification_date",
            initial_value="2024-01-01T00:00:00Z"
        ),
    ).read_csv(
        chunksize=10000,
        dtype={"id": str}
    )

    # Load Parquet files
    yield readers(
        bucket_url="s3://my-bucket/parquet/",
        file_glob="**/*.parquet",
        credentials=credentials,
    ).read_parquet(chunksize=50000)

    # Load JSONL files
    yield readers(
        bucket_url="s3://my-bucket/jsonl/",
        file_glob="**/*.jsonl",
        credentials=credentials,
    ).read_jsonl(chunksize=10000)

# Create and run pipeline
pipeline = dlt.pipeline(
    pipeline_name="s3_pipeline",
    destination="duckdb",
    dataset_name="s3_data"
)

load_info = pipeline.run(s3_data_source())
print(load_info)

Custom File Processing

Implement custom logic for specific file formats:
import json
from dlt.sources.filesystem import filesystem

@dlt.resource
def process_custom_format():
    """Process custom file format"""
    files = filesystem(
        bucket_url="s3://my-bucket/custom/",
        file_glob="*.custom",
    )

    for file_batch in files:
        for file_item in file_batch:
            # Read and decode file
            content = file_item.read_bytes().decode("utf-8")

            # Custom parsing logic
            lines = content.split("\n")
            for line in lines:
                if line.strip():
                    # Parse custom format
                    record = parse_custom_line(line)
                    yield record

def parse_custom_line(line: str) -> dict:
    """Custom parsing logic"""
    # Implement your parsing logic here
    parts = line.split("|")
    return {
        "field1": parts[0],
        "field2": parts[1],
        "field3": int(parts[2]),
    }

pipeline.run(process_custom_format())

Advanced Configuration

Customize fsspec Options

Pass additional options to the underlying fsspec filesystem:
csv_data = readers(
    bucket_url="s3://my-bucket/",
    file_glob="*.csv",
    kwargs={"use_ssl": True},  # fsspec options
    client_kwargs={"region_name": "us-west-2"},  # boto3 client options
).read_csv()

Extract File Content

Load file content directly into the data:
files_with_content = filesystem(
    bucket_url="s3://my-bucket/",
    file_glob="*.txt",
    extract_content=True,  # Adds file_content field
)

for file_batch in files_with_content:
    for file_item in file_batch:
        content = file_item["file_content"]
        # Process content

Control Batch Size

Adjust the number of files processed per batch:
files = filesystem(
    bucket_url="s3://my-bucket/",
    file_glob="*.csv",
    files_per_page=50,  # Process 50 files at a time
)

Best Practices

Balance memory usage and performance with appropriate chunk sizes:
# For large CSV files, use smaller chunks
csv_data = readers(
    bucket_url="s3://...",
    file_glob="*.csv",
).read_csv(chunksize=10000)

# For Parquet, larger chunks are often better
parquet_data = readers(
    bucket_url="s3://...",
    file_glob="*.parquet",
).read_parquet(chunksize=100000)
Always use incremental loading when files are continuously added:
csv_data = readers(
    bucket_url="s3://...",
    file_glob="**/*.csv",
    incremental=dlt.sources.incremental(
        cursor_path="modification_date"
    ),
).read_csv()
Use specific glob patterns to minimize file scanning:
# ✅ Good: Specific pattern
file_glob="data/2024/**/*.csv"

# ❌ Bad: Too broad
file_glob="**/*"
Never hardcode credentials. Use environment variables or secrets.toml:
# secrets.toml
[sources.aws]
aws_access_key_id = "your_access_key"
aws_secret_access_key = "your_secret_key"

Troubleshooting

Ensure credentials are correctly configured:
# Test S3 access
from dlt.common.storages.fsspec_filesystem import fsspec_filesystem

fs, _ = fsspec_filesystem("s3://my-bucket/")
print(fs.ls("."))  # List files to test access
Check your glob pattern and bucket URL:
# Debug: List all files
files = filesystem(
    bucket_url="s3://my-bucket/",
    file_glob="**/*",  # Match everything
)

for file_batch in files:
    for file_item in file_batch:
        print(file_item["file_url"])
Reduce chunk size or process files individually:
# Smaller chunks
csv_data = readers(
    bucket_url="s3://...",
    file_glob="*.csv",
).read_csv(chunksize=1000)  # Smaller chunks

Next Steps

Build docs developers (and LLMs) love