Skip to main content
A map task runs a single task over a list of inputs within one workflow node, eliminating the need to create individual nodes for each item. This delivers substantial performance improvements for homogeneous, data-parallel workloads. Starting with flytekit 1.12.0, the default map_task implementation uses ArrayNode, which provides full Flyte functionality for subtask executions including caching, checkpointing, and workflow recovery.

When to use map tasks

Batch processing

Apply the same transformation to every item in a dataset without writing per-item workflow nodes.

Hyperparameter search

Train a model with each combination of hyperparameters concurrently.

Concurrent data loading

Download or preprocess multiple data shards in parallel.

Basic map task

Import the required library and define a mappable task:
import typing
from flytekit import map_task, task, workflow
@task
def detect_anomalies(data_point: int) -> bool:
    return data_point > 90


@workflow
def map_workflow(data: typing.List[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> typing.List[bool]:
    return map_task(detect_anomalies)(data_point=data)
map_task accepts a single @task and returns a callable that takes a list as input and returns a list of results.
Avoid calling other tasks inside a mappable task. Flyte cannot accurately register tasks that call other tasks, and you lose performance advantages — especially for map tasks.

Controlling concurrency and fault tolerance

You can configure concurrency and min_success_ratio on a map task:
  • concurrency — maximum number of subtasks running in parallel at any time. If the input list is larger than this value, Flyte runs multiple batches serially. Defaults to unbounded concurrency.
  • min_success_ratio — minimum fraction of subtasks that must succeed before the map task is marked as successful. Allows tolerating a small number of failures.
@workflow
def map_workflow_with_additional_params(
    data: typing.List[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]
) -> typing.List[typing.Optional[bool]]:
    return map_task(detect_anomalies, concurrency=1, min_success_ratio=0.75)(data_point=data)
When min_success_ratio is set, the return type of the list must be Optional. Because failures are tolerated, some positions in the output list may not be populated.

Overriding resources per subtask

Use with_overrides to customize resource requests for individual subtask executions:
from flytekit import Resources


@workflow
def map_workflow_with_resource_overrides(
    data: typing.List[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]
) -> typing.List[bool]:
    return map_task(detect_anomalies)(data_point=data).with_overrides(
        requests=Resources(mem="2Gi")
    )

Setting task metadata

Use TaskMetadata to configure caching, retries, and other properties on the map task:
from flytekit import TaskMetadata


@workflow
def map_workflow_with_metadata(
    data: typing.List[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]
) -> typing.List[bool]:
    return map_task(
        detect_anomalies,
        metadata=TaskMetadata(cache=True, cache_version="0.1", retries=1)
    )(data_point=data)
When cache and cache_version are set in TaskMetadata for a map task, cache hits occur at the individual subtask level. If one input item changes, only that item’s task re-runs — all other subtasks are served from cache.

Mapping over multiple inputs

Partial binding with functools.partial

A map task accepts only one varying input. To map over one input while keeping others fixed, use functools.partial:
import functools


@task
def multi_input_task(quantity: int, price: float, shipping: float) -> float:
    return quantity * price + shipping
@workflow
def multiple_inputs_map_workflow(
    quantities: typing.List[int] = [1, 2, 3, 4, 5]
) -> typing.List[float]:
    partial_task = functools.partial(multi_input_task, price=9.99, shipping=5.0)
    return map_task(partial_task)(quantity=quantities)

Binding task outputs to partials

You can also bind the output of an upstream task to a partial:
@task
def get_price() -> float:
    return 9.99


@task
def get_shipping() -> float:
    return 5.0


@workflow
def map_workflow_partial_with_task_output(
    quantities: typing.List[int] = [1, 2, 3, 4, 5]
) -> typing.List[float]:
    price = get_price()
    shipping = get_shipping()
    partial_task = functools.partial(multi_input_task, price=price, shipping=shipping)
    return map_task(partial_task)(quantity=quantities)

Providing multiple lists

You can pass multiple lists directly to a map task. Each list maps element-wise to its corresponding parameter:
@workflow
def map_workflow_with_lists(
    quantities: typing.List[int] = [1, 2, 3],
    prices: typing.List[float] = [9.99, 4.99, 14.99],
) -> typing.List[float]:
    return map_task(multi_input_task)(
        quantity=quantities, price=prices, shipping=5.0
    )
You cannot provide a list as an input to a partial task. Use multiple list inputs directly on map_task instead.

ArrayNode enhancements

The ArrayNode-backed map_task (default since flytekit 1.12.0) provides improvements over the original Kubernetes array plugin:
ArrayNode extends mapping beyond Kubernetes tasks to Python tasks, container tasks, and pod tasks.
Supports both cache serialization and cache overwriting for subtask executions.
Subtasks can checkpoint their state, improving reliability on spot or preemptible instances.
Subtasks remain recoverable during the workflow recovery process.
Running subtasks are aborted appropriately when a sibling subtask fails beyond the min_success_ratio.

Run on the Flyte cluster

pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/advanced_composition/advanced_composition/map_task.py \
  map_workflow

Build docs developers (and LLMs) love