Skip to main content
Conditional flows enable dynamic branching where only one path executes based on runtime conditions. Unlike static branching where all branches run, conditional flows select a single branch to execute.

Complete Example

Here’s the complete ConditionalFlow from the test suite:
"""Conditional (split-switch) flow: start (split-switch) → high_branch/low_branch → join → end"""
from metaflow import FlowSpec, Parameter, step


class ConditionalFlow(FlowSpec):
    """A conditional flow that takes different branches based on a parameter."""

    value = Parameter("value", default=42, type=int)

    @step
    def start(self):
        self.route = "high" if self.value >= 50 else "low"
        self.next({"high": self.high_branch, "low": self.low_branch}, condition="route")

    @step
    def high_branch(self):
        self.result = "HIGH: %d" % self.value
        self.next(self.join)

    @step
    def low_branch(self):
        self.result = "LOW: %d" % self.value
        self.next(self.join)

    @step
    def join(self):
        self.final = self.result
        self.next(self.end)

    @step
    def end(self):
        print("Result:", self.final)


if __name__ == "__main__":
    ConditionalFlow()

How It Works

1

Start step evaluates the condition

The start step:
  • Receives a value parameter (default: 42)
  • Evaluates the condition: self.route = "high" if self.value >= 50 else "low"
  • Uses the condition parameter to select which branch to execute
2

Only one branch executes

Based on self.route:
  • If value >= 50: Only high_branch runs, creating result = "HIGH: {value}"
  • If value < 50: Only low_branch runs, creating result = "LOW: {value}"
The other branch is never executed.
3

Join receives the executed branch result

Unlike static branch joins, the join step receives results from only the executed branch. No inputs parameter is needed since there’s only one input.
4

End step processes the final result

The end step prints the result from whichever branch was executed.
Runtime Efficiency: Only the selected branch executes, saving compute resources compared to static branching where all branches run.

The Conditional Pattern

The conditional syntax uses a dictionary mapping and a condition parameter:
@step
def start(self):
    # 1. Compute the routing key
    self.route = "high" if self.value >= 50 else "low"
    
    # 2. Define branch mapping and condition
    self.next(
        {"high": self.high_branch, "low": self.low_branch},
        condition="route"
    )

Key Components

  1. Branch mapping dictionary: Maps string keys to step methods
  2. Condition attribute: Stores the routing key as a string
  3. condition parameter: Tells Metaflow which attribute contains the routing key

Join Differences

Conditional joins differ from static branch joins:
# Conditional join - no inputs parameter
@step
def join(self):
    self.final = self.result  # Result from the executed branch
# Static branch join - has inputs parameter
@step
def join(self, inputs):
    self.merged_a = inputs.branch_a.a_result
    self.merged_b = inputs.branch_b.b_result

Creating the Dagster Asset

Convert this flow to a Dagster asset:
dagster create conditional_flow.py ConditionalFlow
You can pass parameters when materializing the asset:
# Run with value=60 (takes high branch)
dagster asset materialize --select ConditionalFlow --config '{"value": 60}'

# Run with value=30 (takes low branch)
dagster asset materialize --select ConditionalFlow --config '{"value": 30}'

Generated Dagster Graph

When materialized in Dagster, the conditional flow shows all possible paths, but only one executes:
ConditionalFlow (asset)
  └─ Execution (value=42):
       start
       ├─ high_branch (skipped) ─┐
       └─ low_branch (executed) ─┤
                                 join → end

  └─ Execution (value=75):
       start
       ├─ high_branch (executed) ─┐
       └─ low_branch (skipped) ───┤
                                 join → end
Dagster’s UI will show which branch was taken during each execution.
Difference from Static Branching: In a static branching flow, ALL branches execute in parallel and the join receives results from all. In conditional flows, ONLY ONE branch executes and the join receives only that result.

Advanced: Multi-way Conditionals

You can extend this pattern to support more than two branches:
@step
def start(self):
    if self.value < 25:
        self.route = "very_low"
    elif self.value < 50:
        self.route = "low"
    elif self.value < 75:
        self.route = "high"
    else:
        self.route = "very_high"
    
    self.next({
        "very_low": self.very_low_branch,
        "low": self.low_branch,
        "high": self.high_branch,
        "very_high": self.very_high_branch
    }, condition="route")

Use Cases

  • Environment-based routing: Different processing for dev/staging/prod
  • Data quality checks: Different remediation based on validation results
  • Feature flags: Enable/disable features based on configuration
  • Cost optimization: Choose cheaper processing for small datasets
  • Error handling: Route to different recovery strategies based on error type
  • A/B test routing: Send users down different flow paths

Key Takeaways

  • Use a dictionary with condition parameter for dynamic branching
  • Store the routing key in a string attribute
  • Only the selected branch executes (unlike static branching)
  • Join steps don’t need an inputs parameter
  • Use Parameters to control routing from outside the flow
  • Supports any number of conditional branches
  • More efficient than static branching when only one path is needed