Complete Example
Here’s the completeBranchingFlow from the test suite:
How It Works
Start step splits into parallel branches
The
start step initializes self.value = 10 and calls self.next(self.branch_a, self.branch_b) to execute both branches in parallel.Branches execute independently
branch_amultiplies the value by 2 (result: 20)branch_badds 5 to the value (result: 15)
self.value from the start step.Join step merges results
The
join step receives an inputs parameter containing the results from all incoming branches. Access branch-specific data using inputs.branch_name.attribute.Parallel Execution: Branches execute in parallel. Metaflow handles scheduling and ensures all branches complete before the join step runs.
The Join Pattern
Join steps have a special signature with aninputs parameter:
inputs object provides access to the final state of each incoming branch by branch name.
Creating the Dagster Asset
Convert this flow to a Dagster asset:Generated Dagster Graph
When materialized in Dagster, the branching flow creates a fork-join structure:Static Branching: The number of branches is fixed at flow definition time. For dynamic parallelism based on runtime data, see the Foreach Flow pattern.
Use Cases
- Parallel model training: Train multiple models simultaneously with different hyperparameters
- Multi-source data ingestion: Fetch data from different APIs concurrently
- A/B testing: Process control and treatment groups in parallel
- Independent transformations: Apply different transformations to the same dataset
Key Takeaways
- Use
self.next(step1, step2, ...)to create parallel branches - All branches execute concurrently (where possible)
- Join steps receive an
inputsparameter to access branch results - Access branch data via
inputs.branch_name.attribute - All branches must complete before the join step executes