A task is the fundamental building block and an extension point within Flyte. Tasks have the following characteristics:
Versioned — typically aligned with the git SHA
Strongly typed — all inputs and outputs must be annotated
Declarative — describe what the task does, not how to schedule it
Independently executable — a task can be run on its own without a workflow
Unit-testable — call task functions directly in tests
A Flyte task runs in its own container on a Kubernetes pod . This page focuses on Python function tasks — the most common task type.
Basic task
Apply the @task decorator to any Python function. All inputs and outputs must have type annotations:
from flytekit import task
@task
def slope ( x : list[ int ], y : list[ int ]) -> float :
n = len (x)
sum_xy = sum (xi * yi for xi, yi in zip (x, y))
sum_x = sum (x)
sum_y = sum (y)
sum_x2 = sum (xi ** 2 for xi in x)
return (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2 )
You can execute a Flyte task like any regular Python function:
result = slope( x = [ - 3 , 0 , 3 ], y = [ 7 , 4 , - 2 ])
print (result) # -1.5
When calling a task function, always use keyword arguments . Positional arguments are not supported.
Run a task with pyflyte
pyflyte run task.py slope --x '[-3,0,3]' --y '[7,4,-2]'
pyflyte run --remote task.py slope --x '[-3,0,3]' --y '[7,4,-2]'
Multiple outputs
Return a tuple when a task produces multiple outputs. Flyte assigns default names o0, o1, o2, … in positional order:
from flytekit import task
import typing
@task
def stats ( values : list[ float ]) -> typing.Tuple[ float , float ]:
mean = sum (values) / len (values)
variance = sum ((v - mean) ** 2 for v in values) / len (values)
return mean, variance
For human-readable output names, see Named outputs .
Resource requests
Use Resources to request CPU, memory, or GPU for a task:
from flytekit import task, Resources
@task ( requests = Resources( cpu = "2" , mem = "2Gi" ))
def memory_intensive_task ( data : list[ float ]) -> float :
return sum (data)
@task ( requests = Resources( cpu = "4" , mem = "8Gi" , gpu = "1" ))
def gpu_training_task ( epochs : int ) -> float :
# GPU-accelerated training logic
return 0.95
requests describes the minimum resources Kubernetes should reserve. Use limits to cap the maximum. Both accept the same Resources fields.
Caching
Enable caching to skip re-execution when inputs have not changed. Set cache=True and provide a cache_version string:
from flytekit import task
@task ( cache = True , cache_version = "1.0" )
def expensive_computation ( n : int ) -> int :
total = 0
for i in range (n):
total += i
return total
Increment cache_version whenever the task’s logic changes and you want to invalidate old cached results.
Retries
Configure automatic retries for tasks that may fail transiently (e.g., network calls):
from flytekit import task
@task ( retries = 3 )
def fetch_data ( url : str ) -> str :
import urllib.request
with urllib.request.urlopen(url) as response:
return response.read().decode()
Flyte retries the task up to the specified number of times before marking the execution as failed.
Timeouts
Set a maximum execution time for a task using timeout:
from datetime import timedelta
from flytekit import task
@task ( timeout = timedelta( hours = 2 ))
def long_running_task ( data : list[ float ]) -> float :
import time
time.sleep( 1 )
return sum (data)
Environment variables
Pass environment variables into the task container using environment:
from flytekit import task
@task ( environment = { "MODEL_ENV" : "production" , "LOG_LEVEL" : "info" })
def configured_task ( x : int ) -> int :
import os
env = os.environ.get( "MODEL_ENV" , "development" )
print ( f "Running in { env } " )
return x * 2
Combining task options
All task options can be combined on a single decorator:
from datetime import timedelta
from flytekit import task, Resources
@task (
cache = True ,
cache_version = "2.1" ,
retries = 2 ,
timeout = timedelta( minutes = 30 ),
requests = Resources( cpu = "2" , mem = "4Gi" ),
environment = { "BATCH_SIZE" : "256" },
)
def production_task ( dataset : str ) -> float :
# Task logic here
return 0.0
Task types
The most common type. Apply @task to any Python function. Runs the function body in a container on Kubernetes. from flytekit import task
@task
def my_task ( x : int ) -> int :
return x + 1
Run bash scripts using ShellTask. See Shell tasks for a full guide. from flytekit.extras.tasks.shell import ShellTask
t = ShellTask(
name = "my_shell_task" ,
script = "echo hello world" ,
)
Flyte supports backend plugins for services like AWS Athena, Google BigQuery, SageMaker, Spark, and Ray. These extend the @task interface without requiring custom container code.
Next steps
Workflows Connect tasks together into a directed acyclic graph.
Named outputs Assign meaningful names to task outputs using NamedTuple.