Python has no native type that represents a dataframe in the abstract. Individual libraries — pandas, Spark, Polars, Arrow, Pandera — each define their own types, which makes it difficult to write Flyte tasks that are dataframe-library-agnostic and even harder to enforce schema correctness at the data pipeline level.
StructuredDataset fills this gap. It acts as a typed wrapper around any 2D tabular dataset and provides:
- Automatic serialization and deserialization — no boilerplate needed to convert between Python objects and storage formats
- Format flexibility — Parquet (default), CSV, BigQuery, and any custom format you implement
- Column-level type checking — declare the columns you expect, and Flyte validates them at runtime
- Cross-library interoperability — write a task returning a pandas DataFrame, read it as an Arrow table in the next task
Basic usage
The simplest case requires no changes beyond importing pandas. Flytekit detects the pandas.DataFrame return type and automatically converts it to a StructuredDataset literal backed by a Parquet file in object storage:
import pandas as pd
from flytekit import task, workflow
@task
def generate_pandas_df() -> pd.DataFrame:
return pd.DataFrame(
{
"Name": ["Tom", "Joseph", "Kieran"],
"Age": [20, 22, 23],
"Height": [160, 178, 175],
}
)
@workflow
def dataframe_wf() -> pd.DataFrame:
return generate_pandas_df()
When this task runs, Flytekit:
- Detects the
pd.DataFrame return annotation
- Uses the default encoder (Parquet) to serialize the DataFrame to bytes
- Uploads the Parquet file to the configured object store
- Records a
StructuredDataset literal with the file URI
When a downstream task receives the pd.DataFrame input, the default decoder reads the Parquet file and reconstructs the DataFrame.
To extract a subset of columns and validate their types, annotate the StructuredDataset with column names and types using typing.Annotated:
import typing
import pandas as pd
from flytekit import task, workflow
from flytekit.types.structured.structured_dataset import StructuredDataset
# Declare the columns you expect and their types
subset_schema = StructuredDataset(
metadata={"columns": [{"name": "Age", "type": {"simple": "INTEGER"}}]}
)
# Specify columns using Annotated
AgeOnly = typing.Annotated[
StructuredDataset,
{"columns": [{"name": "Age"}]},
]
A more practical approach uses the StructuredDataset column type annotation directly:
import typing
import pandas as pd
from flytekit import task
from flytekit.types.structured.structured_dataset import StructuredDataset
# Declare columns and Flyte LiteralType mappings
SupersetSD = typing.Annotated[
StructuredDataset,
kwtypes(Name=str, Age=int, Height=int),
]
SubsetSD = typing.Annotated[
StructuredDataset,
kwtypes(Age=int),
]
@task
def filter_to_age_column(sd: SupersetSD) -> SubsetSD:
# open() with a dataframe type triggers deserialization
# Only the "Age" column is materialized due to the SubsetSD annotation
df = sd.open(pd.DataFrame).all()
return StructuredDataset(dataframe=df[["Age"]])
If the columns in the actual data do not match the declared column types, Flyte raises a runtime type error. For example, passing a dataset without the declared Age column will fail.
Working with multiple dataframe libraries
StructuredDataset supports conversion between different dataframe types. Use .open(target_type).all() to deserialize into any registered type:
import typing
import pandas as pd
import pyarrow as pa
from flytekit import task, workflow
from flytekit.types.structured.structured_dataset import StructuredDataset
@task
def produce_dataframe() -> pd.DataFrame:
return pd.DataFrame({"x": [1, 2, 3], "y": [4.0, 5.0, 6.0]})
@task
def consume_as_arrow(sd: StructuredDataset) -> int:
# Convert the Parquet-backed StructuredDataset into a PyArrow table
table: pa.Table = sd.open(pa.Table).all()
return table.num_rows
@workflow
def cross_library_wf() -> int:
df = produce_dataframe()
return consume_as_arrow(sd=df)
Flytekit handles the conversion transparently. As long as an encoder and decoder are registered for the target type, you can open a StructuredDataset as any supported library type.
By default, Flytekit serializes DataFrames as Parquet files. To use CSV or another format, register the appropriate handler and annotate the return type:
import typing
import pandas as pd
from flytekit import task
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
StructuredDatasetTransformerEngine,
)
from flytekit.types.structured import register_csv_handlers
# Register the built-in CSV handler
register_csv_handlers()
# Annotate the output with the CSV format
CSVDataset = typing.Annotated[StructuredDataset, "csv"]
@task
def produce_csv_dataset() -> CSVDataset:
df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
return StructuredDataset(dataframe=df)
@task
def consume_csv_dataset(sd: CSVDataset) -> pd.DataFrame:
return sd.open(pd.DataFrame).all()
The uri argument
StructuredDataset accepts a uri to read from or write to a specific location in object storage, BigQuery, or any supported backend:
Writing to BigQuery
import pandas as pd
from flytekit import task
from flytekit.types.structured.structured_dataset import StructuredDataset
@task
def pandas_to_bq() -> StructuredDataset:
df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
return StructuredDataset(dataframe=df, uri="gs://<BUCKET_NAME>/<FILE_NAME>")
Create a GCP account and service account
Configure credentials
Add the GOOGLE_APPLICATION_CREDENTIALS environment variable to your environment pointing to your service account key file.
Create a BigQuery dataset
Create a project and dataset in BigQuery where Flyte will create tables.
Reading from BigQuery
import pandas as pd
from flytekit import task
from flytekit.types.structured.structured_dataset import StructuredDataset
@task
def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
return sd.open(pd.DataFrame).all()
Flyte creates a table inside the BigQuery dataset when executing a query. No format annotation is needed — the stock BigQuery encoder is registered with an empty format, which the StructuredDatasetTransformerEngine treats as a generic encoder compatible with any format.
Returning multiple dataframes to different locations
import typing
import pandas as pd
from flytekit import task
from flytekit.types.structured.structured_dataset import StructuredDataset
@task
def produce_two_datasets() -> typing.Tuple[StructuredDataset, StructuredDataset]:
df1 = pd.DataFrame({"a": [1, 2, 3]})
df2 = pd.DataFrame({"b": [4, 5, 6]})
return (
StructuredDataset(df1, uri="bq://project:flyte.table"),
StructuredDataset(df2, uri="gs://auxiliary-bucket/data"),
)
How encoders and decoders work
Each structured dataset plugin implements either an encoder (Python object → bytes → storage) or a decoder (storage → bytes → Python object). Flytekit selects which plugin to invoke based on three keys:
- Byte format — Parquet, CSV, BigQuery, etc.
- Storage location — S3, GCS, BigQuery, local
- Python type —
pd.DataFrame, pa.Table, pyspark.DataFrame, etc.
These three keys together uniquely identify the plugin. If a more specific encoder for the given format is not found, Flytekit falls back to a generic encoder registered with an empty format string.
Building a custom encoder and decoder
The following example builds a NumPy encoder and decoder, enabling np.ndarray as a valid StructuredDataset type stored as Parquet:
NumPy encoder
import typing
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from flytekit.types.structured.structured_dataset import (
StructuredDatasetEncoder,
StructuredDatasetTransformerEngine,
StructuredDataset,
PARQUET,
)
from flytekit import FlyteContext
class NumpyEncodingHandler(StructuredDatasetEncoder):
def __init__(self):
super().__init__(np.ndarray, None, PARQUET)
def encode(
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type,
):
array: np.ndarray = structured_dataset.dataframe
# Convert 2D NumPy array to a dict of columns for Arrow
col_dict = {str(i): array[:, i] for i in range(array.shape[1])}
table = pa.table(col_dict)
# Write to Parquet at the remote URI
uri = structured_dataset.uri or ctx.file_access.get_random_remote_path()
with ctx.file_access.get_filesystem().open(uri, "wb") as f:
pq.write_table(table, f)
return StructuredDataset(uri=uri, metadata=structured_dataset_type)
NumPy decoder
from flytekit.types.structured.structured_dataset import StructuredDatasetDecoder
class NumpyDecodingHandler(StructuredDatasetDecoder):
def __init__(self):
super().__init__(np.ndarray, None, PARQUET)
def decode(
self,
ctx: FlyteContext,
flyte_value,
current_task_metadata,
) -> np.ndarray:
uri = flyte_value.uri
with ctx.file_access.get_filesystem().open(uri, "rb") as f:
table = pq.read_table(f)
return table.to_pandas().to_numpy()
NumPy renderer
from flytekit.deck.renderer import Renderer
class NumpyRenderer(Renderer):
"""Renders the schema of a NumPy array in the Flyte Deck UI."""
title = "NumPy Array"
def to_html(self, array: np.ndarray) -> str:
return f"<pre>Shape: {array.shape}\nDtype: {array.dtype}</pre>"
Registering encoder, decoder, and renderer
StructuredDatasetTransformerEngine.register(NumpyEncodingHandler())
StructuredDatasetTransformerEngine.register(NumpyDecodingHandler())
StructuredDatasetTransformerEngine.register_renderer(np.ndarray, NumpyRenderer())
Using the custom type in tasks
import numpy as np
from flytekit import task, workflow
from flytekit.types.structured.structured_dataset import StructuredDataset, PARQUET
import typing
@task
def generate_array() -> np.ndarray:
return np.array([[1, 2, 3], [4, 5, 6]], dtype=float)
@task
def consume_array(array: np.ndarray) -> int:
return array.shape[0] * array.shape[1]
@workflow
def numpy_wf() -> int:
arr = generate_array()
return consume_array(array=arr)
PyArrow raises an Expected bytes, got a 'int' object error when the DataFrame contains integer columns. Cast integer columns to float before encoding, or use pa.array() with an explicit type.
Nested column types
StructuredDataset supports nested field structures, similar to Avro, Parquet, and BigQuery nested records. This requires flytekit ≥ 1.11.0:
import typing
import pandas as pd
from flytekit import task
from flytekit.types.structured.structured_dataset import StructuredDataset
from mashumaro.types import SerializableType
@task
def nested_columns_example() -> StructuredDataset:
# Nested structure: each row contains a dict in the "metadata" column
df = pd.DataFrame(
{
"id": [1, 2, 3],
"metadata": [
{"source": "a", "score": 0.9},
{"source": "b", "score": 0.7},
{"source": "c", "score": 0.8},
],
}
)
return StructuredDataset(dataframe=df)
StructuredDataset in the IDL
StructuredDataset is defined in FlyteIDL as a StructuredDatasetType carrying an ordered list of typed columns, a storage format string, and optional third-party schema bytes:
// From flyteidl/core/types.proto
message StructuredDatasetType {
message DatasetColumn {
string name = 1;
LiteralType literal_type = 2;
}
repeated DatasetColumn columns = 1; // Declared schema columns
string format = 2; // "parquet", "csv", "bigquery", etc.
string external_schema_type = 3; // Optional: e.g., "arrow"
bytes external_schema_bytes = 4; // Optional: serialized Arrow schema
}
// From flyteidl/core/literals.proto
message StructuredDataset {
string uri = 1; // Storage location
StructuredDatasetMetadata metadata = 2; // Runtime column type info
}
The columns field enables compile-time and runtime schema validation. If you do not declare any columns, Flyte accepts any schema. If you declare columns, Flyte checks that the declared subset is present in the actual data.