Skip to main content
A task is the fundamental building block and an extension point within Flyte. Tasks have the following characteristics:
  • Versioned — typically aligned with the git SHA
  • Strongly typed — all inputs and outputs must be annotated
  • Declarative — describe what the task does, not how to schedule it
  • Independently executable — a task can be run on its own without a workflow
  • Unit-testable — call task functions directly in tests
A Flyte task runs in its own container on a Kubernetes pod. This page focuses on Python function tasks — the most common task type.

Basic task

Apply the @task decorator to any Python function. All inputs and outputs must have type annotations:
task.py
from flytekit import task


@task
def slope(x: list[int], y: list[int]) -> float:
    n = len(x)
    sum_xy = sum(xi * yi for xi, yi in zip(x, y))
    sum_x = sum(x)
    sum_y = sum(y)
    sum_x2 = sum(xi ** 2 for xi in x)
    return (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
You can execute a Flyte task like any regular Python function:
result = slope(x=[-3, 0, 3], y=[7, 4, -2])
print(result)  # -1.5
When calling a task function, always use keyword arguments. Positional arguments are not supported.

Run a task with pyflyte

pyflyte run task.py slope --x '[-3,0,3]' --y '[7,4,-2]'

Multiple outputs

Return a tuple when a task produces multiple outputs. Flyte assigns default names o0, o1, o2, … in positional order:
from flytekit import task
import typing


@task
def stats(values: list[float]) -> typing.Tuple[float, float]:
    mean = sum(values) / len(values)
    variance = sum((v - mean) ** 2 for v in values) / len(values)
    return mean, variance
For human-readable output names, see Named outputs.

Resource requests

Use Resources to request CPU, memory, or GPU for a task:
from flytekit import task, Resources


@task(requests=Resources(cpu="2", mem="2Gi"))
def memory_intensive_task(data: list[float]) -> float:
    return sum(data)


@task(requests=Resources(cpu="4", mem="8Gi", gpu="1"))
def gpu_training_task(epochs: int) -> float:
    # GPU-accelerated training logic
    return 0.95
requests describes the minimum resources Kubernetes should reserve. Use limits to cap the maximum. Both accept the same Resources fields.

Caching

Enable caching to skip re-execution when inputs have not changed. Set cache=True and provide a cache_version string:
from flytekit import task


@task(cache=True, cache_version="1.0")
def expensive_computation(n: int) -> int:
    total = 0
    for i in range(n):
        total += i
    return total
Increment cache_version whenever the task’s logic changes and you want to invalidate old cached results.

Retries

Configure automatic retries for tasks that may fail transiently (e.g., network calls):
from flytekit import task


@task(retries=3)
def fetch_data(url: str) -> str:
    import urllib.request
    with urllib.request.urlopen(url) as response:
        return response.read().decode()
Flyte retries the task up to the specified number of times before marking the execution as failed.

Timeouts

Set a maximum execution time for a task using timeout:
from datetime import timedelta
from flytekit import task


@task(timeout=timedelta(hours=2))
def long_running_task(data: list[float]) -> float:
    import time
    time.sleep(1)
    return sum(data)

Environment variables

Pass environment variables into the task container using environment:
from flytekit import task


@task(environment={"MODEL_ENV": "production", "LOG_LEVEL": "info"})
def configured_task(x: int) -> int:
    import os
    env = os.environ.get("MODEL_ENV", "development")
    print(f"Running in {env}")
    return x * 2

Combining task options

All task options can be combined on a single decorator:
from datetime import timedelta
from flytekit import task, Resources


@task(
    cache=True,
    cache_version="2.1",
    retries=2,
    timeout=timedelta(minutes=30),
    requests=Resources(cpu="2", mem="4Gi"),
    environment={"BATCH_SIZE": "256"},
)
def production_task(dataset: str) -> float:
    # Task logic here
    return 0.0

Task types

The most common type. Apply @task to any Python function. Runs the function body in a container on Kubernetes.
from flytekit import task

@task
def my_task(x: int) -> int:
    return x + 1
Run bash scripts using ShellTask. See Shell tasks for a full guide.
from flytekit.extras.tasks.shell import ShellTask

t = ShellTask(
    name="my_shell_task",
    script="echo hello world",
)
Flyte supports backend plugins for services like AWS Athena, Google BigQuery, SageMaker, Spark, and Ray. These extend the @task interface without requiring custom container code.

Next steps

Workflows

Connect tasks together into a directed acyclic graph.

Named outputs

Assign meaningful names to task outputs using NamedTuple.

Build docs developers (and LLMs) love