Workflows link multiple tasks together into a meaningful execution pipeline. They are written as Python functions decorated with @workflow, but it is important to understand the distinction between task code and workflow code.
- Task code executes at run-time on a Kubernetes cluster (or in a query engine, hosted service, etc.).
- Workflow code does not perform computations. It executes at registration time to construct an execution graph. The workflow body is a domain-specific language (DSL) for building DAGs.
Defining a workflow
Import task and workflow from flytekit and compose tasks inside the workflow function:
from typing import List
from math import sqrt
from flytekit import task, workflow
@task
def mean(values: List[float]) -> float:
return sum(values) / len(values)
@task
def standard_deviation(values: List[float], mu: float) -> float:
variance = sum([(x - mu) ** 2 for x in values])
return sqrt(variance)
@task
def standard_scale(values: List[float], mu: float, sigma: float) -> List[float]:
return [(x - mu) / sigma for x in values]
@workflow
def standard_scale_workflow(values: List[float]) -> List[float]:
mu = mean(values=values)
sigma = standard_deviation(values=values, mu=mu)
return standard_scale(values=values, mu=mu, sigma=sigma)
Just like tasks, workflows are strongly typed — inputs and outputs carry type annotations.
To run the workflow locally:
pyflyte run workflow.py standard_scale_workflow --values '[1.0,2.0,3.0,4.0,5.0]'
To run on a remote Flyte cluster:
pyflyte run --remote workflow.py standard_scale_workflow --values '[1.0,2.0,3.0,4.0,5.0]'
Workflows build execution graphs
The @workflow decorator compiles the function body into a DAG. When Flyte parses standard_scale_workflow, it does not execute the tasks — it records the data dependencies between them:
values ──► mean ──────────────────────────────► standard_scale ──► output
\──► standard_deviation (mu=mean) ──►/
Because mean and standard_deviation both depend only on values (not on each other’s outputs), Flyte automatically runs them in parallel when executed on a cluster.
Node executions are triggered as soon as their inputs are available. When five independent nodes exist in a workflow, Flyte runs all five in parallel subject to resource constraints.
Workflows deal with promises
Inside a workflow body, the return values of task calls are promises — placeholders for values that have not yet been computed. You can pass promises into downstream tasks, but you cannot inspect their values directly within the workflow.
@workflow
def standard_scale_workflow_with_print(values: List[float]) -> List[float]:
mu = mean(values=values) # mu is a Promise, not a float!
print(mu) # this prints the promise object, not the actual value
sigma = standard_deviation(values=values, mu=mu)
return standard_scale(values=values, mu=mu, sigma=sigma)
This means the following constraints apply inside workflow functions:
- Do not use non-deterministic operations like
random.random() or datetime.now(). These run at compile time.
- Do not try to branch on task outputs with
if/else. Use the Flyte conditional construct instead.
- Only pass promises into tasks, workflows, and other Flyte constructs.
Strong typing catches errors early
Because both tasks and workflows are strongly typed, Flyte can catch type mismatches at compile time — before any code runs on a cluster. For example:
@task
def buggy_scale(values: List[float], mu: float, sigma: float) -> float:
# Bug: returns a scalar (float) instead of List[float]
return sum([(x - mu) / sigma for x in values])
@workflow
def buggy_workflow(values: List[float]) -> List[float]:
mu = mean(values=values)
sigma = standard_deviation(values=values, mu=mu)
return buggy_scale(values=values, mu=mu, sigma=sigma) # type error!
try:
buggy_workflow(values=[1.0, 2.0, 3.0])
except Exception as e:
print(e) # type mismatch: expected List[float], got float
Subworkflows
A workflow can embed another workflow as a node in its execution graph. The inner workflow is called a subworkflow. Subworkflows are strongly typed and invoked just like tasks:
import random
@task
def generate_data(num_samples: int, seed: int) -> List[float]:
random.seed(seed)
return [random.random() for _ in range(num_samples)]
@workflow
def workflow_with_subworkflow(num_samples: int, seed: int) -> List[float]:
data = generate_data(num_samples=num_samples, seed=seed)
return standard_scale_workflow(values=data) # subworkflow call
A subworkflow executes within the context of the parent workflow’s execution. Its nodes appear in the parent’s execution timeline in FlyteConsole.
Specifying dependencies without passing data
Use the >> right-shift operator to declare ordering dependencies between tasks or subworkflows when you do not need to pass data between them:
@workflow
def ordered_wf():
p1 = task1()
p2 = task2()
p3 = subworkflow()
p1 >> p2 # task2 runs after task1, but receives no data from it
p2 >> p3 # subworkflow runs after task2
You can use functools.partial to bind default or constant values to task parameters when calling them from a workflow:
import functools
from flytekit import task, workflow
@task
def add(x: float, y: float) -> float:
return x + y
@workflow
def add_with_default_wf(x: float) -> float:
add_one = functools.partial(add, y=1.0)
return add_one(x=x)
Workflow versioning
Like tasks, workflows are versioned and immutable once registered. A specific workflow version (identified by project, domain, name, and version) cannot be changed. This guarantees that you can always re-run a historical workflow version with identical behavior.
Tasks referenced in a workflow version are also immutable — they are tied to specific task versions at registration time.
Running workflows with FlyteRemote
For programmatic execution from a Python environment:
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote
remote = FlyteRemote(
config=Config.auto(),
default_project="flytesnacks",
default_domain="development",
)
# Option 1: execute a locally imported workflow
from workflows.example import standard_scale_workflow
execution = remote.execute(
standard_scale_workflow,
inputs={"values": [1.0, 2.0, 3.0, 4.0, 5.0]},
)
# Option 2: fetch a workflow from the cluster by name
flyte_wf = remote.fetch_workflow(name="workflows.example.standard_scale_workflow")
execution = remote.execute(flyte_wf, inputs={"values": [1.0, 2.0, 3.0]})
# Wait for completion and access outputs
completed = remote.wait(execution)
print(completed.outputs)