Skip to main content
FlyteDirectory extends Flyte’s blob system to entire directories. It is represented in FlyteIDL as a multi-part blob — a BlobType with MULTIPART dimensionality — which means Flyte manages the transfer of all files within the directory to and from remote object storage. Common use cases for FlyteDirectory include:
  • Model checkpoints — PyTorch Lightning, TensorFlow, and Hugging Face all write model artifacts as directories
  • Dataset directories — a directory of CSV or Parquet shards that collectively represent one dataset
  • Multi-file outputs — anything where a single task produces a collection of related files

Basic usage

Import FlyteDirectory from flytekit:
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory

Downloading files into a FlyteDirectory

The following example downloads a list of URLs pointing to CSV files and returns a FlyteDirectory containing all downloaded files:
import os
import urllib.request
import typing
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory


@task
def download_files(csv_urls: typing.List[str]) -> FlyteDirectory:
    # Create a local working directory for downloads
    local_dir = "/tmp/csv_files"
    os.makedirs(local_dir, exist_ok=True)

    for url in csv_urls:
        filename = os.path.basename(url)
        dest = os.path.join(local_dir, filename)
        urllib.request.urlretrieve(url, dest)
        print(f"Downloaded {url} -> {dest}")

    # Return the directory; Flytekit uploads all contents to object storage
    return FlyteDirectory(path=local_dir)
When the task completes, Flytekit uploads every file in local_dir to the configured object store and returns a multi-part blob literal pointing to the remote prefix.

Processing files in a FlyteDirectory

Once you have a FlyteDirectory as a task input, call .download() to materialize all files locally. You can then iterate over the directory using standard Python file system operations:
import csv
import math
import os
import typing
from flytekit import task
from flytekit.types.directory import FlyteDirectory


def normalize_columns_in_place(
    local_path: str,
    column_names: typing.List[str],
    columns_to_normalize: typing.List[str],
) -> None:
    """Normalize specified columns of a CSV file using z-score normalization."""
    rows = []
    with open(local_path) as f:
        reader = csv.DictReader(f, fieldnames=column_names)
        for row in reader:
            rows.append(row)

    for col in columns_to_normalize:
        values = [float(row[col]) for row in rows]
        mean = sum(values) / len(values)
        variance = sum((v - mean) ** 2 for v in values) / len(values)
        std = math.sqrt(variance)
        for row in rows:
            row[col] = str((float(row[col]) - mean) / std) if std > 0 else "0.0"

    with open(local_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=column_names)
        writer.writerows(rows)


@task
def normalize_all_files(
    directory: FlyteDirectory,
    all_column_names: typing.List[typing.List[str]],
    columns_to_normalize: typing.List[typing.List[str]],
) -> FlyteDirectory:
    # Download the entire directory to a local path
    local_dir = directory.download()

    # Iterate over each CSV file in the directory
    csv_files = sorted(
        f for f in os.listdir(local_dir) if f.endswith(".csv")
    )

    for i, filename in enumerate(csv_files):
        file_path = os.path.join(local_dir, filename)
        normalize_columns_in_place(
            local_path=file_path,
            column_names=all_column_names[i],
            columns_to_normalize=columns_to_normalize[i],
        )

    # Return the modified local directory; Flytekit uploads the updated files
    return FlyteDirectory(path=local_dir)

Composing tasks into a workflow

@workflow
def download_and_normalize_csv_files(
    csv_urls: typing.List[str],
    all_column_names: typing.List[typing.List[str]],
    columns_to_normalize: typing.List[typing.List[str]],
) -> FlyteDirectory:
    directory = download_files(csv_urls=csv_urls)
    return normalize_all_files(
        directory=directory,
        all_column_names=all_column_names,
        columns_to_normalize=columns_to_normalize,
    )

Running the workflow locally

if __name__ == "__main__":
    result = download_and_normalize_csv_files(
        csv_urls=[
            "https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv",
            "https://people.sc.fsu.edu/~jburkardt/data/csv/hw_25000.csv",
        ],
        all_column_names=[
            ["Name", "Sex", "Age", "Height", "Weight"],
            ["Index", "Height", "Weight"],
        ],
        columns_to_normalize=[
            ["Age", "Height", "Weight"],
            ["Height", "Weight"],
        ],
    )
    print(f"Normalized directory: {result.path}")

Batch upload and download

You can annotate a FlyteDirectory with BatchSize to control how many files are transferred at a time. This is useful for very large directories where loading all files into memory simultaneously is impractical:
import typing
from typing import Annotated
from flytekit import task
from flytekit.types.directory import FlyteDirectory
from flytekit.types.directory import BatchSize

@task
def process_in_batches(
    directory: Annotated[FlyteDirectory, BatchSize(10)],
) -> Annotated[FlyteDirectory, BatchSize(100)]:
    """
    Flytekit downloads input files in chunks of 10.
    Output files are uploaded in chunks of 100.
    """
    local_dir = directory.download()
    # ... process files in local_dir ...
    return FlyteDirectory(path=local_dir)
When BatchSize(10) is specified on an input, Flytekit fetches 10 files at a time, processes them, and then fetches the next 10. On outputs, it uploads 100 files at a time.

Specifying directory format

Like FlyteFile, you can scope FlyteDirectory with a format string that describes the protocol or format used by files within the directory:
import typing
from flytekit.types.directory import FlyteDirectory

# All files in this directory are Parquet files
ParquetDirectory = FlyteDirectory[typing.TypeVar("parquet")]

# Model artifact directory (e.g., HuggingFace model)
ModelDirectory = FlyteDirectory[typing.TypeVar("hf-model")]
from flytekit import task, workflow

@task
def save_model(weights: str) -> ParquetDirectory:
    # ... write model files to /tmp/model/ ...
    return ParquetDirectory(path="/tmp/model")

@task
def load_model(model_dir: ParquetDirectory) -> str:
    local = model_dir.download()
    # ... read model files from local ...
    return f"Loaded model from {local}"

Streaming support (experimental)

Flyte 1.5 introduced streaming support for FlyteDirectory via the fsspec library. With streaming, you can walk and copy directory contents without downloading everything first:
Streaming support is experimental. The API may change in future releases.
import os
from flytekit import task
from flytekit.types.directory import FlyteDirectory

@task
def copy_directory(source: FlyteDirectory) -> FlyteDirectory:
    dest = FlyteDirectory.new_remote("output_dir")

    # Walk the remote directory without a full local download
    for dirpath, dirnames, filenames in source.crawl():
        for filename in filenames:
            src_file = os.path.join(dirpath, filename)
            dst_file = os.path.join(dest.remote_directory, filename)

            with FlyteFile(src_file).open("rb") as in_f:
                with FlyteFile(dst_file).open("wb") as out_f:
                    out_f.write(in_f.read())

    return dest

FlyteDirectory in the IDL

In FlyteIDL, FlyteDirectory is represented as a Blob with MULTIPART dimensionality:
// From flyteidl/core/types.proto
message BlobType {
    enum BlobDimensionality {
        SINGLE = 0;      // FlyteFile
        MULTIPART = 1;   // FlyteDirectory
    }
    string format = 1;         // e.g., "parquet", "" for untyped
    BlobDimensionality dimensionality = 2;
}
The uri field of the Blob literal points to the object storage prefix (e.g., s3://bucket/prefix/) rather than a single object. Flyte’s data management layer handles the listing and transfer of all objects under that prefix when materializing the directory.

FlyteDirectory vs. FlyteFile

FlyteFileFlyteDirectory
IDL representationBlobType.SINGLEBlobType.MULTIPART
RepresentsOne fileAll files under a prefix
.download()Downloads a single fileDownloads all files recursively
BatchSize annotationNot applicableControls chunk size during transfer
Typical use caseCSVs, images, model weightsCheckpoints, dataset shards, multi-file artifacts

Build docs developers (and LLMs) love