FlyteDirectory extends Flyte’s blob system to entire directories. It is represented in FlyteIDL as a multi-part blob — a BlobType with MULTIPART dimensionality — which means Flyte manages the transfer of all files within the directory to and from remote object storage.
Common use cases for FlyteDirectory include:
- Model checkpoints — PyTorch Lightning, TensorFlow, and Hugging Face all write model artifacts as directories
- Dataset directories — a directory of CSV or Parquet shards that collectively represent one dataset
- Multi-file outputs — anything where a single task produces a collection of related files
Basic usage
Import FlyteDirectory from flytekit:
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
Downloading files into a FlyteDirectory
The following example downloads a list of URLs pointing to CSV files and returns a FlyteDirectory containing all downloaded files:
import os
import urllib.request
import typing
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
@task
def download_files(csv_urls: typing.List[str]) -> FlyteDirectory:
# Create a local working directory for downloads
local_dir = "/tmp/csv_files"
os.makedirs(local_dir, exist_ok=True)
for url in csv_urls:
filename = os.path.basename(url)
dest = os.path.join(local_dir, filename)
urllib.request.urlretrieve(url, dest)
print(f"Downloaded {url} -> {dest}")
# Return the directory; Flytekit uploads all contents to object storage
return FlyteDirectory(path=local_dir)
When the task completes, Flytekit uploads every file in local_dir to the configured object store and returns a multi-part blob literal pointing to the remote prefix.
Processing files in a FlyteDirectory
Once you have a FlyteDirectory as a task input, call .download() to materialize all files locally. You can then iterate over the directory using standard Python file system operations:
import csv
import math
import os
import typing
from flytekit import task
from flytekit.types.directory import FlyteDirectory
def normalize_columns_in_place(
local_path: str,
column_names: typing.List[str],
columns_to_normalize: typing.List[str],
) -> None:
"""Normalize specified columns of a CSV file using z-score normalization."""
rows = []
with open(local_path) as f:
reader = csv.DictReader(f, fieldnames=column_names)
for row in reader:
rows.append(row)
for col in columns_to_normalize:
values = [float(row[col]) for row in rows]
mean = sum(values) / len(values)
variance = sum((v - mean) ** 2 for v in values) / len(values)
std = math.sqrt(variance)
for row in rows:
row[col] = str((float(row[col]) - mean) / std) if std > 0 else "0.0"
with open(local_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=column_names)
writer.writerows(rows)
@task
def normalize_all_files(
directory: FlyteDirectory,
all_column_names: typing.List[typing.List[str]],
columns_to_normalize: typing.List[typing.List[str]],
) -> FlyteDirectory:
# Download the entire directory to a local path
local_dir = directory.download()
# Iterate over each CSV file in the directory
csv_files = sorted(
f for f in os.listdir(local_dir) if f.endswith(".csv")
)
for i, filename in enumerate(csv_files):
file_path = os.path.join(local_dir, filename)
normalize_columns_in_place(
local_path=file_path,
column_names=all_column_names[i],
columns_to_normalize=columns_to_normalize[i],
)
# Return the modified local directory; Flytekit uploads the updated files
return FlyteDirectory(path=local_dir)
Composing tasks into a workflow
@workflow
def download_and_normalize_csv_files(
csv_urls: typing.List[str],
all_column_names: typing.List[typing.List[str]],
columns_to_normalize: typing.List[typing.List[str]],
) -> FlyteDirectory:
directory = download_files(csv_urls=csv_urls)
return normalize_all_files(
directory=directory,
all_column_names=all_column_names,
columns_to_normalize=columns_to_normalize,
)
Running the workflow locally
if __name__ == "__main__":
result = download_and_normalize_csv_files(
csv_urls=[
"https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv",
"https://people.sc.fsu.edu/~jburkardt/data/csv/hw_25000.csv",
],
all_column_names=[
["Name", "Sex", "Age", "Height", "Weight"],
["Index", "Height", "Weight"],
],
columns_to_normalize=[
["Age", "Height", "Weight"],
["Height", "Weight"],
],
)
print(f"Normalized directory: {result.path}")
Batch upload and download
You can annotate a FlyteDirectory with BatchSize to control how many files are transferred at a time. This is useful for very large directories where loading all files into memory simultaneously is impractical:
import typing
from typing import Annotated
from flytekit import task
from flytekit.types.directory import FlyteDirectory
from flytekit.types.directory import BatchSize
@task
def process_in_batches(
directory: Annotated[FlyteDirectory, BatchSize(10)],
) -> Annotated[FlyteDirectory, BatchSize(100)]:
"""
Flytekit downloads input files in chunks of 10.
Output files are uploaded in chunks of 100.
"""
local_dir = directory.download()
# ... process files in local_dir ...
return FlyteDirectory(path=local_dir)
When BatchSize(10) is specified on an input, Flytekit fetches 10 files at a time, processes them, and then fetches the next 10. On outputs, it uploads 100 files at a time.
Like FlyteFile, you can scope FlyteDirectory with a format string that describes the protocol or format used by files within the directory:
import typing
from flytekit.types.directory import FlyteDirectory
# All files in this directory are Parquet files
ParquetDirectory = FlyteDirectory[typing.TypeVar("parquet")]
# Model artifact directory (e.g., HuggingFace model)
ModelDirectory = FlyteDirectory[typing.TypeVar("hf-model")]
from flytekit import task, workflow
@task
def save_model(weights: str) -> ParquetDirectory:
# ... write model files to /tmp/model/ ...
return ParquetDirectory(path="/tmp/model")
@task
def load_model(model_dir: ParquetDirectory) -> str:
local = model_dir.download()
# ... read model files from local ...
return f"Loaded model from {local}"
Streaming support (experimental)
Flyte 1.5 introduced streaming support for FlyteDirectory via the fsspec library. With streaming, you can walk and copy directory contents without downloading everything first:
Streaming support is experimental. The API may change in future releases.
import os
from flytekit import task
from flytekit.types.directory import FlyteDirectory
@task
def copy_directory(source: FlyteDirectory) -> FlyteDirectory:
dest = FlyteDirectory.new_remote("output_dir")
# Walk the remote directory without a full local download
for dirpath, dirnames, filenames in source.crawl():
for filename in filenames:
src_file = os.path.join(dirpath, filename)
dst_file = os.path.join(dest.remote_directory, filename)
with FlyteFile(src_file).open("rb") as in_f:
with FlyteFile(dst_file).open("wb") as out_f:
out_f.write(in_f.read())
return dest
FlyteDirectory in the IDL
In FlyteIDL, FlyteDirectory is represented as a Blob with MULTIPART dimensionality:
// From flyteidl/core/types.proto
message BlobType {
enum BlobDimensionality {
SINGLE = 0; // FlyteFile
MULTIPART = 1; // FlyteDirectory
}
string format = 1; // e.g., "parquet", "" for untyped
BlobDimensionality dimensionality = 2;
}
The uri field of the Blob literal points to the object storage prefix (e.g., s3://bucket/prefix/) rather than a single object. Flyte’s data management layer handles the listing and transfer of all objects under that prefix when materializing the directory.
FlyteDirectory vs. FlyteFile
| FlyteFile | FlyteDirectory |
|---|
| IDL representation | BlobType.SINGLE | BlobType.MULTIPART |
| Represents | One file | All files under a prefix |
.download() | Downloads a single file | Downloads all files recursively |
BatchSize annotation | Not applicable | Controls chunk size during transfer |
| Typical use case | CSVs, images, model weights | Checkpoints, dataset shards, multi-file artifacts |