Skip to main content
A dynamic workflow is a workflow whose directed acyclic graph (DAG) is computed at run time rather than at compile time. Use the @dynamic decorator to define one. Think of a dynamic workflow as a combination of a task and a workflow. Flyte executes the body of a dynamic workflow inside a Kubernetes pod, which produces a workflow plan — the futures file — that Flyte’s propeller then schedules and runs.
Local execution works with @dynamic because Flytekit treats it as a task that runs with native Python inputs.

How dynamic workflows differ from static workflows

@workflow@dynamic
DAG constructionCompile timeRun time
Input valuesPromises (lazy)Materialized (real values)
Return valueConcrete outputPromise object
State storageetcd (CRD)Blobstore (offloaded)
Within a @dynamic context, every call to a @task or derivative returns a Promise rather than an actual value. You cannot directly inspect task outputs inside a dynamic workflow — if you need to operate on them, move that logic into a separate task.

When to use dynamic workflows

Runtime logic

Modify workflow structure or parameters based on data known only at execution time.

Feature engineering

Decide feature extraction parameters on the fly based on incoming data characteristics.

AutoML pipelines

Build pipelines that adjust their structure based on intermediate model evaluation results.

Hyperparameter tuning

Fan out to a variable number of training runs determined by runtime conditions.

Basic example: counting common characters

This example uses a dynamic workflow to count common characters between two strings. The loop count is unknown until runtime — a perfect use case for @dynamic. Import the required library:
import typing
Define helper tasks:
from flytekit import dynamic, task, workflow


@task
def return_index(character: str) -> int:
    """Returns the index of a character where A-Z/a-z maps to 0-25."""
    if character.islower():
        return ord(character) - ord("a")
    else:
        return ord(character) - ord("A")


@task
def update_list(freq_list: typing.List[int], index: int) -> typing.List[int]:
    """Increments the frequency count at the given index."""
    freq_list[index] += 1
    return freq_list


@task
def derive_count(
    freq1: typing.List[int], freq2: typing.List[int]
) -> int:
    """Counts characters that appear in both frequency lists."""
    count = 0
    for i in range(26):
        count += min(freq1[i], freq2[i])
    return count
Define the dynamic workflow:
@dynamic
def count_characters(s1: str, s2: str) -> int:
    # Initialize empty frequency lists for both strings
    freq1 = [0] * 26
    freq2 = [0] * 26

    # Iterate through each character of s1 and populate frequency list
    for char in s1:
        index = return_index(character=char)
        freq1 = update_list(freq_list=freq1, index=index)

    # Iterate through each character of s2 and populate frequency list
    for char in s2:
        index = return_index(character=char)
        freq2 = update_list(freq_list=freq2, index=index)

    # Determine the count of common characters
    return derive_count(freq1=freq1, freq2=freq2)
The loop body iterates len(s1) + len(s2) times, a value that is only known at runtime. Flyte executes this body to generate a compiled DAG of individual task nodes, then schedules those nodes. Wire a workflow around the dynamic function:
@workflow
def dynamic_wf(s1: str = "Pear", s2: str = "Earth") -> int:
    return count_characters(s1=s1, s2=s2)


if __name__ == "__main__":
    print(f"Common characters: {dynamic_wf(s1='Pear', s2='Earth')}")

Advanced example: merge sort with recursion

Merge sort demonstrates recursion with dynamic workflows. Flyte imposes a depth limit to protect system stability, but controlled recursion works well.
@task
def merge(sorted_list1: typing.List[int], sorted_list2: typing.List[int]) -> typing.List[int]:
    result = []
    i = j = 0
    while i < len(sorted_list1) and j < len(sorted_list2):
        if sorted_list1[i] < sorted_list2[j]:
            result.append(sorted_list1[i])
            i += 1
        else:
            result.append(sorted_list2[j])
            j += 1
    result += sorted_list1[i:]
    result += sorted_list2[j:]
    return result


@task
def sort_locally(numbers: typing.List[int]) -> typing.List[int]:
    return sorted(numbers)


@dynamic
def merge_sort_remotely(
    numbers: typing.List[int], numbers_count: int
) -> typing.List[int]:
    # Split the list into two halves
    half = numbers_count // 2
    sorted_left = merge_sort(
        numbers=numbers[:half], numbers_count=half
    )
    sorted_right = merge_sort(
        numbers=numbers[half:], numbers_count=numbers_count - half
    )
    return merge(sorted_list1=sorted_left, sorted_list2=sorted_right)


@workflow
def merge_sort(
    numbers: typing.List[int], numbers_count: int
) -> typing.List[int]:
    # Base case: sort locally for small lists
    if numbers_count <= 5:
        return sort_locally(numbers=numbers)
    return merge_sort_remotely(numbers=numbers, numbers_count=numbers_count)
By adding @dynamic to merge_sort_remotely, the function becomes a plan of execution that generates a Flyte workflow with four distinct nodes. Each recursive invocation runs on potentially different hosts, with Flyte managing data references and execution order.
@dynamic is essential here because the number of recursive calls is unknown at compile time. The dynamic workflow calls a static workflow, which calls the dynamic workflow again, creating a recursive and flexible execution structure.

Why dynamic workflows reduce etcd pressure

Static workflow CRDs and node states are stored in etcd, which has hard size limits. Dynamic workflows offload the entire workflow specification — node/task definitions and connections — to the blobstore. Only node execution statuses remain in etcd, keeping the CRD small.

Dynamic workflows vs. map tasks

Dynamic workflows carry overhead for large fan-out operations because they store metadata for every node in the workflow. For pure parallelism over a list of inputs, prefer map tasks, which use a bitset compression algorithm and avoid per-node metadata overhead.

Run on the Flyte cluster

pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/advanced_composition/advanced_composition/dynamic_workflow.py \
  dynamic_wf --s1 "Pear" --s2 "Earth"
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/advanced_composition/advanced_composition/dynamic_workflow.py \
  merge_sort --numbers '[1813, 3105, 3260, 2634, 383, 7037, 3291, 2403, 315, 7164]' --numbers_count 10

Build docs developers (and LLMs) love