Filesystem Source
The filesystem source reads files from various storage locations including cloud storage (AWS S3, Google Cloud Storage, Azure Blob Storage) and local filesystems. It provides built-in readers for common file formats and supports custom file processing.
Quick Start
Load CSV files from an S3 bucket:
import dlt
from dlt.sources.filesystem import readers
# Load CSV files from S3
csv_reader = readers(
bucket_url = "s3://my-bucket/data/" ,
file_glob = "*.csv" ,
).read_csv()
pipeline = dlt.pipeline(
pipeline_name = "s3_csv_pipeline" ,
destination = "duckdb" ,
dataset_name = "s3_data"
)
load_info = pipeline.run(csv_reader)
print (load_info)
Supported Storage Locations
AWS S3 bucket_url = "s3://bucket-name/path/"
Google Cloud Storage bucket_url = "gs://bucket-name/path/"
Azure Blob Storage bucket_url = "az://container-name/path/"
Local Filesystem bucket_url = "file:///local/path/"
SFTP bucket_url = "sftp://user@host/path/"
Google Drive bucket_url = "gdrive://folder-id/"
Authentication
Configure credentials for cloud storage providers:
AWS S3
Google Cloud Storage
Azure Blob Storage
from dlt.sources.credentials import AwsCredentials
credentials = AwsCredentials(
aws_access_key_id = dlt.secrets[ "aws_access_key_id" ],
aws_secret_access_key = dlt.secrets[ "aws_secret_access_key" ],
)
csv_reader = readers(
bucket_url = "s3://my-bucket/data/" ,
credentials = credentials,
).read_csv()
Or use environment variables: export AWS_ACCESS_KEY_ID = "your_access_key"
export AWS_SECRET_ACCESS_KEY = "your_secret_key"
from dlt.sources.credentials import GcpServiceAccountCredentials
credentials = GcpServiceAccountCredentials(
project_id = "my-project" ,
private_key = dlt.secrets[ "gcp_private_key" ],
client_email = dlt.secrets[ "gcp_client_email" ],
)
csv_reader = readers(
bucket_url = "gs://my-bucket/data/" ,
credentials = credentials,
).read_csv()
Or use service account JSON: export GOOGLE_APPLICATION_CREDENTIALS = "/path/to/service-account.json"
from dlt.sources.credentials import AzureCredentials
credentials = AzureCredentials(
azure_storage_account_name = dlt.secrets[ "account_name" ],
azure_storage_account_key = dlt.secrets[ "account_key" ],
)
csv_reader = readers(
bucket_url = "az://my-container/data/" ,
credentials = credentials,
).read_csv()
Built-in File Readers
The readers source provides built-in support for common file formats:
CSV Files
from dlt.sources.filesystem import readers
# Read CSV files with default settings
csv_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*.csv" , # Recursive pattern
).read_csv()
# Customize CSV reading with pandas kwargs
csv_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
).read_csv(
chunksize = 10000 ,
sep = ";" ,
encoding = "utf-8" ,
dtype = { "id" : str },
)
pipeline.run(csv_data)
Parquet Files
# Read Parquet files
parquet_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*.parquet" ,
).read_parquet( chunksize = 50000 )
pipeline.run(parquet_data)
JSONL Files
# Read JSON Lines files
jsonl_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*.jsonl" ,
).read_jsonl( chunksize = 10000 )
pipeline.run(jsonl_data)
DuckDB CSV Reader
For faster CSV processing, use the DuckDB reader:
# Use DuckDB for fast CSV reading
csv_duckdb = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
).read_csv_duckdb(
chunksize = 100000 ,
# Pass DuckDB-specific options
header = True ,
delim = "," ,
)
pipeline.run(csv_duckdb)
File Filtering
Use glob patterns to filter files:
All CSV Files (Recursive)
Specific Directory
Multiple Extensions
Date Pattern
# Match all CSV files in all subdirectories
file_glob = "**/*.csv"
Using the Filesystem Resource Directly
For custom file processing, use the filesystem resource:
from dlt.sources.filesystem import filesystem
@dlt.resource
def process_files ():
"""Custom file processing logic"""
files = filesystem(
bucket_url = "s3://my-bucket/data/" ,
file_glob = "*.txt" ,
)
for file_batch in files:
for file_item in file_batch:
# Read file content
content = file_item.read_bytes()
# Process content
processed_data = custom_parser(content)
yield processed_data
pipeline.run(process_files())
Access file metadata from FileItem objects:
from dlt.sources.filesystem import filesystem
@dlt.resource
def file_metadata ():
"""Extract file metadata"""
files = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*" ,
)
for file_batch in files:
for file_item in file_batch:
yield {
"file_url" : file_item[ "file_url" ],
"file_name" : file_item[ "file_name" ],
"mime_type" : file_item[ "mime_type" ],
"size_in_bytes" : file_item[ "size_in_bytes" ],
"modification_date" : file_item[ "modification_date" ],
}
pipeline.run(file_metadata())
Incremental Loading
Load only new or modified files:
from dlt.sources.filesystem import readers
# Load files incrementally based on modification date
csv_data = readers(
bucket_url = "s3://my-bucket/data/" ,
file_glob = "**/*.csv" ,
incremental = dlt.sources.incremental(
cursor_path = "modification_date" ,
initial_value = "2024-01-01T00:00:00Z"
),
).read_csv()
pipeline.run(csv_data)
On subsequent runs, only files modified after the last run will be processed.
Complete Example: S3 to DuckDB
A comprehensive example loading multiple file formats from S3:
import dlt
from dlt.sources.filesystem import readers, filesystem
from dlt.sources.credentials import AwsCredentials
@dlt.source
def s3_data_source ():
"""Load multiple file formats from S3"""
# AWS credentials from secrets
credentials = AwsCredentials(
aws_access_key_id = dlt.secrets[ "aws_access_key_id" ],
aws_secret_access_key = dlt.secrets[ "aws_secret_access_key" ],
)
# Load CSV files with incremental loading
yield readers(
bucket_url = "s3://my-bucket/csv/" ,
file_glob = "**/*.csv" ,
credentials = credentials,
incremental = dlt.sources.incremental(
cursor_path = "modification_date" ,
initial_value = "2024-01-01T00:00:00Z"
),
).read_csv(
chunksize = 10000 ,
dtype = { "id" : str }
)
# Load Parquet files
yield readers(
bucket_url = "s3://my-bucket/parquet/" ,
file_glob = "**/*.parquet" ,
credentials = credentials,
).read_parquet( chunksize = 50000 )
# Load JSONL files
yield readers(
bucket_url = "s3://my-bucket/jsonl/" ,
file_glob = "**/*.jsonl" ,
credentials = credentials,
).read_jsonl( chunksize = 10000 )
# Create and run pipeline
pipeline = dlt.pipeline(
pipeline_name = "s3_pipeline" ,
destination = "duckdb" ,
dataset_name = "s3_data"
)
load_info = pipeline.run(s3_data_source())
print (load_info)
Custom File Processing
Implement custom logic for specific file formats:
import json
from dlt.sources.filesystem import filesystem
@dlt.resource
def process_custom_format ():
"""Process custom file format"""
files = filesystem(
bucket_url = "s3://my-bucket/custom/" ,
file_glob = "*.custom" ,
)
for file_batch in files:
for file_item in file_batch:
# Read and decode file
content = file_item.read_bytes().decode( "utf-8" )
# Custom parsing logic
lines = content.split( " \n " )
for line in lines:
if line.strip():
# Parse custom format
record = parse_custom_line(line)
yield record
def parse_custom_line ( line : str ) -> dict :
"""Custom parsing logic"""
# Implement your parsing logic here
parts = line.split( "|" )
return {
"field1" : parts[ 0 ],
"field2" : parts[ 1 ],
"field3" : int (parts[ 2 ]),
}
pipeline.run(process_custom_format())
Advanced Configuration
Customize fsspec Options
Pass additional options to the underlying fsspec filesystem:
csv_data = readers(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
kwargs = { "use_ssl" : True }, # fsspec options
client_kwargs = { "region_name" : "us-west-2" }, # boto3 client options
).read_csv()
Load file content directly into the data:
files_with_content = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.txt" ,
extract_content = True , # Adds file_content field
)
for file_batch in files_with_content:
for file_item in file_batch:
content = file_item[ "file_content" ]
# Process content
Control Batch Size
Adjust the number of files processed per batch:
files = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "*.csv" ,
files_per_page = 50 , # Process 50 files at a time
)
Best Practices
Use appropriate chunk sizes
Balance memory usage and performance with appropriate chunk sizes: # For large CSV files, use smaller chunks
csv_data = readers(
bucket_url = "s3://..." ,
file_glob = "*.csv" ,
).read_csv( chunksize = 10000 )
# For Parquet, larger chunks are often better
parquet_data = readers(
bucket_url = "s3://..." ,
file_glob = "*.parquet" ,
).read_parquet( chunksize = 100000 )
Use incremental loading for growing datasets
Always use incremental loading when files are continuously added: csv_data = readers(
bucket_url = "s3://..." ,
file_glob = "**/*.csv" ,
incremental = dlt.sources.incremental(
cursor_path = "modification_date"
),
).read_csv()
Use specific glob patterns to minimize file scanning: # ✅ Good: Specific pattern
file_glob = "data/2024/**/*.csv"
# ❌ Bad: Too broad
file_glob = "**/*"
Never hardcode credentials. Use environment variables or secrets.toml: # secrets.toml
[ sources . aws ]
aws_access_key_id = "your_access_key"
aws_secret_access_key = "your_secret_key"
Troubleshooting
Ensure credentials are correctly configured: # Test S3 access
from dlt.common.storages.fsspec_filesystem import fsspec_filesystem
fs, _ = fsspec_filesystem( "s3://my-bucket/" )
print (fs.ls( "." )) # List files to test access
Check your glob pattern and bucket URL: # Debug: List all files
files = filesystem(
bucket_url = "s3://my-bucket/" ,
file_glob = "**/*" , # Match everything
)
for file_batch in files:
for file_item in file_batch:
print (file_item[ "file_url" ])
Memory issues with large files
Reduce chunk size or process files individually: # Smaller chunks
csv_data = readers(
bucket_url = "s3://..." ,
file_glob = "*.csv" ,
).read_csv( chunksize = 1000 ) # Smaller chunks
Next Steps