Skip to main content
Available since Flyte 1.3.0 Some workflows need to pause mid-execution and wait for input that isn’t available at launch time — a human approval, an annotation decision, or a value produced by an external system. Flyte provides three workflow node types for this:
NodePurpose
sleep(duration)Pause for a fixed duration before continuing
wait_for_input(name, timeout, expected_type)Pause until an external signal provides a typed value
approve(upstream_item, name, timeout)Pause until an explicit approval signal is received
These functions can only be used inside @workflow-decorated functions, @dynamic-decorated functions, or imperative workflows.

Use cases

Model deployment review

Train n models, generate a comparison report, then wait for a human to approve before deploying to production.

Data labeling

Iterate through an image dataset and pause for a human annotator to label each image before continuing.

Active learning

Train a model, identify uncertain examples, wait for a human to label them, then retrain with the new labels.

Pausing with sleep

The simplest gate node sleeps for a specified duration before the next node runs:
import typing
from datetime import timedelta
from flytekit import sleep, task, workflow


@task
def add_one(x: int) -> int:
    return x + 1


@workflow
def sleep_wf(x: int = 0) -> int:
    sleeping = sleep(timedelta(seconds=10))
    result = add_one(x=x)
    sleeping >> result
    return result
The >> operator enforces ordering without data flow: result will not start until after the 10-second sleep completes.
See Chaining Flyte entities to learn more about the >> chaining operator.

Waiting for a typed input with wait_for_input

wait_for_input pauses execution until an external caller (human or machine) provides a value of the specified type through the Flyte API or UI:
from flytekit import wait_for_input


@task
def create_report(data: typing.List[float]) -> dict:
    return {"mean": sum(data) / len(data), "count": len(data)}


@task
def finalize_report(report: dict, title: str) -> dict:
    return {**report, "title": title}


@workflow
def reporting_wf(data: typing.List[float] = [1.0, 2.0, 3.0]) -> dict:
    report = create_report(data=data)

    # Pause here and wait up to 1 hour for someone to provide a title string
    title_input = wait_for_input(
        "title-input",
        timeout=timedelta(hours=1),
        expected_type=str,
    )

    return finalize_report(report=report, title=title_input)
What happens at runtime:
  1. create_report runs and produces the raw report.
  2. The workflow pauses at title-input. It will wait up to 1 hour.
  3. When a string is supplied through the Flyte UI or API, finalize_report runs with the provided title.

Gating continuation with approve

approve passes a promise through as-is after receiving an explicit approval signal. Use it to block progress until someone confirms a result is acceptable:
from flytekit import approve


@task
def publish_report(report: dict) -> str:
    return f"Published: {report.get('title', 'Untitled')}"


@workflow
def approval_wf(data: typing.List[float] = [1.0, 2.0, 3.0]) -> str:
    report = create_report(data=data)

    # Block publishing until "approve-final-report" receives an approval
    approved_report = approve(report, "approve-final-report", timeout=timedelta(hours=2))

    return publish_report(report=approved_report)

Using the approve output as a promise

You can also chain downstream tasks from the approved value:
@workflow
def approval_as_promise_wf(data: typing.List[float] = [1.0, 2.0, 3.0]) -> str:
    report = create_report(data=data)
    approved_report = approve(report, "approve-report", timeout=timedelta(hours=2))
    titled = finalize_report(report=approved_report, title="Approved Report")
    return publish_report(report=titled)

Combining gate nodes with conditionals

Gate nodes become especially powerful alongside conditional. Here the workflow produces a different output depending on whether the report is approved:
from flytekit import conditional


@task
def invalid_report(reason: str) -> str:
    return f"Report rejected: {reason}"


@workflow
def gate_node_with_conditional_wf(data: typing.List[float] = [1.0, 2.0, 3.0]) -> str:
    report = create_report(data=data)
    title_input = wait_for_input(
        "title-input", timeout=timedelta(hours=1), expected_type=str
    )
    titled_report = finalize_report(report=report, title=title_input)

    # Ask for approval and a disapproval reason in parallel
    approved = wait_for_input(
        "review-passes", timeout=timedelta(hours=2), expected_type=bool
    )
    disapprove_reason = wait_for_input(
        "disapprove-reason", timeout=timedelta(hours=2), expected_type=str
    )

    return (
        conditional("approval-gate")
        .if_(approved.is_true())
        .then(publish_report(report=titled_report))
        .else_()
        .then(invalid_report(reason=disapprove_reason))
    )

Sending inputs through the Flyte UI

1

Launch the workflow

Start the workflow from the Flyte UI. The execution graph shows gate nodes as pending.
2

Find the waiting node

In the Graph view, locate the wait_for_input or approve node. It displays a play icon when ready to receive input.
3

Provide the input

Click the play icon or the Resume button in the sidebar. A modal form appears where you enter the required value.
4

Continue

After submitting, the workflow resumes from the gate node with the provided value.

Sending inputs programmatically with FlyteRemote

For automated or machine-driven approvals, use FlyteRemote.set_signal:
import typing
from flytekit.remote.remote import FlyteRemote
from flytekit.configuration import Config

remote = FlyteRemote(
    Config.for_sandbox(),
    default_project="flytesnacks",
    default_domain="development",
)

# Fetch and execute the workflow
flyte_workflow = remote.fetch_workflow(
    name="core.control_flow.waiting_for_external_inputs.gate_node_with_conditional_wf"
)
execution = remote.execute(flyte_workflow, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})

# List available signals for this execution
signals = remote.list_signals(execution.id.name)

# Provide the title string to the "title-input" gate node
remote.set_signal("title-input", execution.id.name, "my report")

# Approve the report by setting "review-passes" to True
remote.set_signal("review-passes", execution.id.name, True)
Use remote.list_signals(execution.id.name) to discover which gate nodes are currently waiting for input before calling set_signal.

Timeout behavior

All gate nodes accept a timeout parameter. If the signal is not received within the timeout period, the execution fails with a timeout error. Set timeouts appropriate to your human-review SLA to prevent workflows from waiting indefinitely.

Build docs developers (and LLMs) love