Skip to main content
Branching flows demonstrate Metaflow’s static branching capability: splitting execution into multiple parallel paths that later rejoin. This pattern is essential for parallel processing workflows.

Complete Example

Here’s the complete BranchingFlow from the test suite:
"""Branch/join flow: start → (branch_a, branch_b) → join → end"""
from metaflow import FlowSpec, step


class BranchingFlow(FlowSpec):
    """A branching flow that splits and rejoins."""

    @step
    def start(self):
        self.value = 10
        self.next(self.branch_a, self.branch_b)

    @step
    def branch_a(self):
        self.a_result = self.value * 2
        self.next(self.join)

    @step
    def branch_b(self):
        self.b_result = self.value + 5
        self.next(self.join)

    @step
    def join(self, inputs):
        self.merged_a = inputs.branch_a.a_result
        self.merged_b = inputs.branch_b.b_result
        self.next(self.end)

    @step
    def end(self):
        assert self.merged_a == 20
        assert self.merged_b == 15
        print("BranchingFlow completed: a=%d b=%d" % (self.merged_a, self.merged_b))


if __name__ == "__main__":
    BranchingFlow()

How It Works

1

Start step splits into parallel branches

The start step initializes self.value = 10 and calls self.next(self.branch_a, self.branch_b) to execute both branches in parallel.
2

Branches execute independently

  • branch_a multiplies the value by 2 (result: 20)
  • branch_b adds 5 to the value (result: 15)
Both branches run concurrently and have access to self.value from the start step.
3

Join step merges results

The join step receives an inputs parameter containing the results from all incoming branches. Access branch-specific data using inputs.branch_name.attribute.
4

End step validates merged data

The end step verifies that both branch results were successfully merged.
Parallel Execution: Branches execute in parallel. Metaflow handles scheduling and ensures all branches complete before the join step runs.

The Join Pattern

Join steps have a special signature with an inputs parameter:
@step
def join(self, inputs):
    # Access individual branch results
    self.merged_a = inputs.branch_a.a_result
    self.merged_b = inputs.branch_b.b_result
The inputs object provides access to the final state of each incoming branch by branch name.

Creating the Dagster Asset

Convert this flow to a Dagster asset:
dagster create branching_flow.py BranchingFlow
Dagster will recognize the parallel structure and handle execution accordingly.

Generated Dagster Graph

When materialized in Dagster, the branching flow creates a fork-join structure:
BranchingFlow (asset)
  └─ Execution:
       start
       ├─ branch_a ─┐
       └─ branch_b ─┤
                    join → end
Dagster visualizes the parallel branches clearly in its UI, showing dependencies and execution status for each branch.
Static Branching: The number of branches is fixed at flow definition time. For dynamic parallelism based on runtime data, see the Foreach Flow pattern.

Use Cases

  • Parallel model training: Train multiple models simultaneously with different hyperparameters
  • Multi-source data ingestion: Fetch data from different APIs concurrently
  • A/B testing: Process control and treatment groups in parallel
  • Independent transformations: Apply different transformations to the same dataset

Key Takeaways

  • Use self.next(step1, step2, ...) to create parallel branches
  • All branches execute concurrently (where possible)
  • Join steps receive an inputs parameter to access branch results
  • Access branch data via inputs.branch_name.attribute
  • All branches must complete before the join step executes