Build workflows whose structure is determined at runtime using the @dynamic decorator.
A dynamic workflow is a workflow whose directed acyclic graph (DAG) is computed at run time rather than at compile time. Use the @dynamic decorator to define one.Think of a dynamic workflow as a combination of a task and a workflow. Flyte executes the body of a dynamic workflow inside a Kubernetes pod, which produces a workflow plan — the futures file — that Flyte’s propeller then schedules and runs.
Local execution works with @dynamic because Flytekit treats it as a task that runs with native Python inputs.
How dynamic workflows differ from static workflows
@workflow
@dynamic
DAG construction
Compile time
Run time
Input values
Promises (lazy)
Materialized (real values)
Return value
Concrete output
Promise object
State storage
etcd (CRD)
Blobstore (offloaded)
Within a @dynamic context, every call to a @task or derivative returns a Promise rather than an actual value. You cannot directly inspect task outputs inside a dynamic workflow — if you need to operate on them, move that logic into a separate task.
This example uses a dynamic workflow to count common characters between two strings. The loop count is unknown until runtime — a perfect use case for @dynamic.Import the required library:
import typing
Define helper tasks:
from flytekit import dynamic, task, workflow@taskdef return_index(character: str) -> int: """Returns the index of a character where A-Z/a-z maps to 0-25.""" if character.islower(): return ord(character) - ord("a") else: return ord(character) - ord("A")@taskdef update_list(freq_list: typing.List[int], index: int) -> typing.List[int]: """Increments the frequency count at the given index.""" freq_list[index] += 1 return freq_list@taskdef derive_count( freq1: typing.List[int], freq2: typing.List[int]) -> int: """Counts characters that appear in both frequency lists.""" count = 0 for i in range(26): count += min(freq1[i], freq2[i]) return count
Define the dynamic workflow:
@dynamicdef count_characters(s1: str, s2: str) -> int: # Initialize empty frequency lists for both strings freq1 = [0] * 26 freq2 = [0] * 26 # Iterate through each character of s1 and populate frequency list for char in s1: index = return_index(character=char) freq1 = update_list(freq_list=freq1, index=index) # Iterate through each character of s2 and populate frequency list for char in s2: index = return_index(character=char) freq2 = update_list(freq_list=freq2, index=index) # Determine the count of common characters return derive_count(freq1=freq1, freq2=freq2)
The loop body iterates len(s1) + len(s2) times, a value that is only known at runtime. Flyte executes this body to generate a compiled DAG of individual task nodes, then schedules those nodes.Wire a workflow around the dynamic function:
Merge sort demonstrates recursion with dynamic workflows. Flyte imposes a depth limit to protect system stability, but controlled recursion works well.
@taskdef merge(sorted_list1: typing.List[int], sorted_list2: typing.List[int]) -> typing.List[int]: result = [] i = j = 0 while i < len(sorted_list1) and j < len(sorted_list2): if sorted_list1[i] < sorted_list2[j]: result.append(sorted_list1[i]) i += 1 else: result.append(sorted_list2[j]) j += 1 result += sorted_list1[i:] result += sorted_list2[j:] return result@taskdef sort_locally(numbers: typing.List[int]) -> typing.List[int]: return sorted(numbers)@dynamicdef merge_sort_remotely( numbers: typing.List[int], numbers_count: int) -> typing.List[int]: # Split the list into two halves half = numbers_count // 2 sorted_left = merge_sort( numbers=numbers[:half], numbers_count=half ) sorted_right = merge_sort( numbers=numbers[half:], numbers_count=numbers_count - half ) return merge(sorted_list1=sorted_left, sorted_list2=sorted_right)@workflowdef merge_sort( numbers: typing.List[int], numbers_count: int) -> typing.List[int]: # Base case: sort locally for small lists if numbers_count <= 5: return sort_locally(numbers=numbers) return merge_sort_remotely(numbers=numbers, numbers_count=numbers_count)
By adding @dynamic to merge_sort_remotely, the function becomes a plan of execution that generates a Flyte workflow with four distinct nodes. Each recursive invocation runs on potentially different hosts, with Flyte managing data references and execution order.
@dynamic is essential here because the number of recursive calls is unknown at compile time. The dynamic workflow calls a static workflow, which calls the dynamic workflow again, creating a recursive and flexible execution structure.
Static workflow CRDs and node states are stored in etcd, which has hard size limits. Dynamic workflows offload the entire workflow specification — node/task definitions and connections — to the blobstore. Only node execution statuses remain in etcd, keeping the CRD small.
Dynamic workflows carry overhead for large fan-out operations because they store metadata for every node in the workflow. For pure parallelism over a list of inputs, prefer map tasks, which use a bitset compression algorithm and avoid per-node metadata overhead.