By default, Flyte assigns sequential names to outputs: o0, o1, o2, and so on, where o is the standard prefix and the number indicates the positional index.
When a task returns multiple outputs, these default names make it hard to know what each output represents. Using NamedTuple lets you assign meaningful names to each output.
Define named outputs with NamedTuple
Import the required dependencies:
from typing import NamedTuple
from flytekit import task, workflow
Define a NamedTuple and use it as the return type annotation for your task:
LinearRegressionOutput = NamedTuple(
"LinearRegressionOutput",
slope=float,
intercept=float,
)
@task
def compute_regression(x: list[int], y: list[int]) -> LinearRegressionOutput:
n = len(x)
sum_xy = sum(xi * yi for xi, yi in zip(x, y))
sum_x = sum(x)
sum_y = sum(y)
sum_x2 = sum(xi ** 2 for xi in x)
m = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
b = (sum_y - m * sum_x) / n
return LinearRegressionOutput(slope=m, intercept=b)
Declare NamedTuple types explicitly at module level rather than inline. Inline definitions like NamedTuple("slope_value", slope=float) inside the return annotation can cause linting errors in tools like mypy.
Named outputs per task
You can define a separate NamedTuple for each task:
SlopeOutput = NamedTuple("SlopeOutput", slope=float)
InterceptOutput = NamedTuple("InterceptOutput", intercept=float)
@task
def slope(x: list[int], y: list[int]) -> SlopeOutput:
n = len(x)
sum_xy = sum(xi * yi for xi, yi in zip(x, y))
sum_x = sum(x)
sum_y = sum(y)
sum_x2 = sum(xi ** 2 for xi in x)
m = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
return SlopeOutput(slope=m)
@task
def intercept(x: list[int], y: list[int], slope: float) -> InterceptOutput:
mean_x = sum(x) / len(x)
mean_y = sum(y) / len(y)
b = mean_y - slope * mean_x
return InterceptOutput(intercept=b)
Access named outputs in a workflow
Unpack NamedTuple outputs by dereferencing them inside the workflow. Because NamedTuple behaves like a tuple, you must dereference the field name explicitly:
LinearRegressionResult = NamedTuple(
"LinearRegressionResult",
slope=float,
intercept=float,
)
@workflow
def simple_wf_with_named_outputs(
x: list[int] = [-3, 0, 3],
y: list[int] = [7, 4, -2],
) -> LinearRegressionResult:
slope_val = slope(x=x, y=y)
intercept_val = intercept(x=x, y=y, slope=slope_val.slope)
return LinearRegressionResult(
slope=slope_val.slope,
intercept=intercept_val.intercept,
)
Notice that slope_val.slope dereferences the slope field from the SlopeOutput NamedTuple. This dereferencing step is required any time you pass a named output to another task or return it from a workflow.
Run the workflow
if __name__ == "__main__":
result = simple_wf_with_named_outputs()
print(result) # LinearRegressionResult(slope=-1.5, intercept=4.0)
print(result.slope) # -1.5
print(result.intercept) # 4.0
Complete example
from typing import NamedTuple
from flytekit import task, workflow
SlopeOutput = NamedTuple("SlopeOutput", slope=float)
InterceptOutput = NamedTuple("InterceptOutput", intercept=float)
LinearRegressionResult = NamedTuple(
"LinearRegressionResult",
slope=float,
intercept=float,
)
@task
def slope(x: list[int], y: list[int]) -> SlopeOutput:
n = len(x)
sum_xy = sum(xi * yi for xi, yi in zip(x, y))
sum_x = sum(x)
sum_y = sum(y)
sum_x2 = sum(xi ** 2 for xi in x)
m = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
return SlopeOutput(slope=m)
@task
def intercept(x: list[int], y: list[int], slope: float) -> InterceptOutput:
mean_x = sum(x) / len(x)
mean_y = sum(y) / len(y)
b = mean_y - slope * mean_x
return InterceptOutput(intercept=b)
@workflow
def simple_wf_with_named_outputs(
x: list[int] = [-3, 0, 3],
y: list[int] = [7, 4, -2],
) -> LinearRegressionResult:
slope_val = slope(x=x, y=y)
intercept_val = intercept(x=x, y=y, slope=slope_val.slope)
return LinearRegressionResult(
slope=slope_val.slope,
intercept=intercept_val.intercept,
)
if __name__ == "__main__":
result = simple_wf_with_named_outputs()
print(result)