Skip to main content
A launch plan links a partial or complete list of inputs required to trigger a workflow, along with optional runtime overrides such as notifications and schedules. Launch plans are the only mechanism for invoking workflow executions in Flyte.

Why launch plans?

Launch plans serve several purposes:
  • Run the same workflow multiple times with different predefined inputs.
  • Schedule workflows to run on a cron cadence or at a fixed interval.
  • Share a workflow with pre-bound inputs so another user can trigger it with one click.
  • Lock specific inputs so they cannot be overridden at execution time.
  • Associate different notification settings with the same workflow.

Default launch plan

Every workflow automatically gets a default launch plan when it is registered. The default launch plan has the same name as the workflow and can bind default workflow inputs and runtime options defined in the project’s flytekit configuration.
Users rarely need to interact with the default launch plan directly. It is generated automatically and is always available.

Creating a launch plan

Import LaunchPlan from flytekit and use LaunchPlan.get_or_create to define a named launch plan:
from typing import List
from flytekit import task, workflow, LaunchPlan


@task
def mean(values: List[float]) -> float:
    return sum(values) / len(values)


@task
def standard_scale(values: List[float], mu: float, sigma: float) -> List[float]:
    from math import sqrt
    variance = sum([(x - mu) ** 2 for x in values])
    sigma = sqrt(variance)
    return [(x - mu) / sigma for x in values]


@workflow
def standard_scale_workflow(values: List[float]) -> List[float]:
    mu = mean(values=values)
    return standard_scale(values=values, mu=mu, sigma=1.0)


# Create a launch plan with default inputs
standard_scale_lp = LaunchPlan.get_or_create(
    standard_scale_workflow,
    name="standard_scale_lp",
    default_inputs={"values": [3.0, 4.0, 5.0]},
)

Running a launch plan locally

You can invoke a launch plan just like a regular Python function. It uses the default_inputs whenever called without arguments:
# Uses default_inputs: values=[3.0, 4.0, 5.0]
standard_scale_lp()

# Override the defaults
standard_scale_lp(values=[float(x) for x in range(20, 30)])

Input types

Launch plans support two categories of inputs:

Default inputs

Provide default values for workflow inputs. These can be overridden at execution time — either by the caller or in the FlyteConsole UI.

Fixed inputs

Lock specific input values so they cannot be changed at execution time. If a caller attempts to override a fixed input, the execution request fails.
from flytekit import LaunchPlan

# Default inputs — can be overridden
lp_with_defaults = LaunchPlan.get_or_create(
    standard_scale_workflow,
    name="lp_defaults",
    default_inputs={"values": [1.0, 2.0, 3.0]},
)

# Fixed inputs — cannot be overridden
lp_with_fixed = LaunchPlan.get_or_create(
    standard_scale_workflow,
    name="lp_fixed",
    fixed_inputs={"values": [1.0, 2.0, 3.0]},
)
You can combine default and fixed inputs in the same launch plan.

Scheduling

Attach a schedule to a launch plan to run a workflow automatically at a specific cadence.

Cron schedule

from flytekit import LaunchPlan, CronSchedule


nightly_lp = LaunchPlan.get_or_create(
    standard_scale_workflow,
    name="nightly_scale_lp",
    schedule=CronSchedule(schedule="0 0 * * *"),  # daily at midnight
    default_inputs={"values": [1.0, 2.0, 3.0, 4.0, 5.0]},
)

Fixed-rate schedule

from datetime import timedelta
from flytekit import LaunchPlan, FixedRate


every_hour_lp = LaunchPlan.get_or_create(
    standard_scale_workflow,
    name="hourly_scale_lp",
    schedule=FixedRate(duration=timedelta(hours=1)),
    default_inputs={"values": [1.0, 2.0, 3.0]},
)

Passing the scheduled kick-off time

If your workflow takes a datetime input (e.g., to control which time window of data to process), you can forward the cron kick-off time automatically:
from datetime import datetime
from flytekit import workflow, LaunchPlan, CronSchedule


@workflow
def process_data_wf(kickoff_time: datetime):
    # read data based on kickoff_time
    ...


process_data_lp = LaunchPlan.get_or_create(
    process_data_wf,
    name="process_data_lp",
    schedule=CronSchedule(
        schedule="0 * * * *",              # every hour
        kickoff_time_input_arg="kickoff_time",  # pass the cron time to the workflow
    ),
)

Activating and deactivating schedules

After registering a launch plan, you must activate the schedule before it runs. Schedules can be activated and deactivated without re-registering.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config

remote = FlyteRemote(config=Config.auto())

# Activate
launchplan_id = remote.fetch_launch_plan(name="process_data_lp").id
remote.client.update_launch_plan(launchplan_id, "ACTIVE")

# Deactivate
remote.client.update_launch_plan(launchplan_id, "INACTIVE")
Only one version of a launch plan with a given name can be active at a time. Activating a new version automatically deactivates the previous one.

Embedding launch plans in workflows

Launch plans can be used inside a workflow definition, similar to subworkflows:
@task
def generate_data(num_samples: int, seed: int) -> List[float]:
    import random
    random.seed(seed)
    return [random.random() for _ in range(num_samples)]


@workflow
def workflow_with_launchplan(num_samples: int, seed: int) -> List[float]:
    data = generate_data(num_samples=num_samples, seed=seed)
    return standard_scale_lp(values=data)
The key difference from subworkflows: when a launch plan is used inside a workflow, it kicks off a new, independent workflow execution on the cluster with its own execution name. A subworkflow executes within the parent workflow’s context.

Executing launch plans with FlyteRemote

from flytekit.remote import FlyteRemote
from flytekit.configuration import Config


remote = FlyteRemote(config=Config.auto())

# Option 1: execute a locally defined launch plan
execution = remote.execute(standard_scale_lp, inputs={})

# Option 2: fetch from cluster by name
flyte_lp = remote.fetch_launch_plan(name="standard_scale_lp")
execution = remote.execute(flyte_lp, inputs={})

# Wait for completion
completed = remote.wait(execution)
print(completed.outputs)

Build docs developers (and LLMs) love