Skip to main content
Files are among the most common data primitives in data and ML workflows. FlyteFile provides a typed, managed wrapper around individual files that Flyte serializes as a Blob literal backed by a BlobType with SINGLE dimensionality. When a task declares FlyteFile as an input or output:
  • Inputs: Flytekit translates the remote URI into a FlyteFile object. The file is not downloaded until you call .download() or open the file’s path.
  • Outputs: When the task returns a FlyteFile, Flytekit uploads the local file to blob storage (S3, GCS, or Azure Blob) and creates a blob literal pointing to the uploaded location.
This lazy transfer model means tasks never unnecessarily copy large files and can operate on remote URIs directly using streaming.

Basic usage

Import FlyteFile from flytekit.types.file:
from flytekit import task, workflow
from flytekit.types.file import FlyteFile

FlyteFile as input and output

The following example downloads CSV files, normalizes specified columns using z-score normalization, and returns the result as a new CSV file:
import csv
import math
import os
import typing
from flytekit import task, workflow
from flytekit.types.file import FlyteFile

# Type alias: CSV files specifically
CSVFile = FlyteFile[typing.TypeVar("csv")]


@task
def normalize_columns(
    csv_url: FlyteFile,
    column_names: typing.List[str],
    columns_to_normalize: typing.List[str],
    output_location: str = "",
) -> FlyteFile:
    # Calling .download() fetches the file from remote storage to a local temp path
    local_path = csv_url.download()

    # Read rows using the local path
    rows = []
    with open(local_path) as f:
        reader = csv.DictReader(f, fieldnames=column_names)
        for row in reader:
            rows.append(row)

    # Compute mean and standard deviation for each column to normalize
    for col in columns_to_normalize:
        values = [float(row[col]) for row in rows]
        mean = sum(values) / len(values)
        variance = sum((v - mean) ** 2 for v in values) / len(values)
        std = math.sqrt(variance)
        for row in rows:
            row[col] = str((float(row[col]) - mean) / std) if std > 0 else "0.0"

    # Write normalized output to a local file
    out_path = os.path.join(os.path.dirname(local_path), "normalized.csv")
    with open(out_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=column_names)
        writer.writerows(rows)

    # If output_location is specified, Flyte uploads to that path instead of a random object store location
    return FlyteFile(
        out_path,
        remote_path=output_location if output_location else None,
    )
When the task completes, Flytekit automatically uploads out_path to the configured object store and records the URI in the blob literal.

Composing tasks into a workflow

@workflow
def normalize_csv_file(
    csv_url: FlyteFile,
    column_names: typing.List[str],
    columns_to_normalize: typing.List[str],
    output_location: str = "",
) -> FlyteFile:
    return normalize_columns(
        csv_url=csv_url,
        column_names=column_names,
        columns_to_normalize=columns_to_normalize,
        output_location=output_location,
    )

Running the workflow locally

if __name__ == "__main__":
    # FlyteFile accepts a local path or a remote URI (s3://, gs://, etc.)
    result = normalize_csv_file(
        csv_url=FlyteFile("https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv"),
        column_names=["Name", "Sex", "Age", "Height", "Weight"],
        columns_to_normalize=["Age", "Height", "Weight"],
    )
    print(f"Normalized file written to: {result.path}")

Specifying file format

FlyteFile can be scoped with a format string, which gets inserted as the format field of the BlobType in the IDL. The format is optional and defaults to an empty string if not specified:
import typing
from flytekit.types.file import FlyteFile

# Explicitly typed as a JPEG file
JPEGFile = FlyteFile[typing.TypeVar("jpeg")]

# Explicitly typed as a Parquet file
ParquetFile = FlyteFile[typing.TypeVar("parquet")]
Predefined format aliases are available in flytekit.types.file. Using a format alias enables type validation between tasks — Flyte will raise a type mismatch error at compile time if you pass a JPEG file to a task that expects Parquet.
from flytekit.types.file import FlyteFile, PNGImageFile, PythonPickledFile, HDF5EncodedFile

@task
def process_image(image: PNGImageFile) -> PNGImageFile:
    # image.download() materializes the file locally
    local = image.download()
    # ... process the PNG ...
    return PNGImageFile(path=local)

How download works

FlyteFile uses lazy evaluation. Until you explicitly access the file contents, the data is not transferred:
from flytekit.types.file import FlyteFile

@task
def inspect_file(f: FlyteFile) -> str:
    # At this point, f.path is still a remote URI (e.g., s3://bucket/key)
    print(f"Remote path: {f.remote_path}")

    # Calling download() copies the file to a local temp directory
    local_path = f.download()

    # Now f.path points to the local copy
    with open(local_path) as fh:
        first_line = fh.readline()

    return first_line
You can also open a FlyteFile using open() on its path attribute after calling download(). The path attribute returns the local path once downloaded.

Streaming support (experimental)

Flyte 1.5 introduced streaming support for FlyteFile via the fsspec library. Streaming enables efficient, on-demand access to remote files without downloading them fully to local storage first.
Streaming support is experimental. The API may change in future releases.
The following example removes specific columns from a CSV file using streaming, reading and writing directly from and to remote storage:
import csv
import io
import typing
from flytekit import task
from flytekit.types.file import FlyteFile

@task
def drop_columns(
    csv_file: FlyteFile,
    columns_to_drop: typing.List[str],
) -> FlyteFile:
    # Open the remote file for streaming read without full download
    with csv_file.open("r") as in_f:
        reader = csv.DictReader(io.TextIOWrapper(in_f))
        field_names = [f for f in reader.fieldnames if f not in columns_to_drop]

        out_file = FlyteFile.new_remote_file("filtered.csv")
        with out_file.open("w") as out_f:
            writer = csv.DictWriter(
                io.TextIOWrapper(out_f, write_through=True),
                fieldnames=field_names,
            )
            writer.writeheader()
            for row in reader:
                writer.writerow({k: v for k, v in row.items() if k in field_names})

    return out_file

Controlling the output location

By default, Flyte writes output files to a random path in the configured object store. To write to a specific location, pass a remote_path to the FlyteFile constructor:
from flytekit import task
from flytekit.types.file import FlyteFile

@task
def produce_report(data: str, destination: str) -> FlyteFile:
    local_path = "/tmp/report.txt"
    with open(local_path, "w") as f:
        f.write(data)
    # Flyte will upload the local file to `destination` in your object store
    return FlyteFile(local_path, remote_path=destination)

Type validation

If you have the python-magic package installed, Flyte can validate the actual byte-level format of a file against its declared type annotation.
brew install libmagic
pip install python-magic
Type validation based on python-magic is only supported on macOS and Linux.

FlyteFile in the IDL

In FlyteIDL, FlyteFile is represented as a Blob scalar with a BlobType of SINGLE dimensionality:
// From flyteidl/core/literals.proto
message Blob {
    BlobMetadata metadata = 1;
    string uri = 3;
}

// From flyteidl/core/types.proto
message BlobType {
    enum BlobDimensionality {
        SINGLE = 0;      // FlyteFile
        MULTIPART = 1;   // FlyteDirectory
    }
    string format = 1;         // e.g., "csv", "jpeg", "parquet"
    BlobDimensionality dimensionality = 2;
}
The format field maps to the type parameter on FlyteFile[T]. When no format is specified, the field is left empty and Flyte uses a binary blob type.

Build docs developers (and LLMs) love