Skip to main content
Foreach flows enable dynamic parallelism by mapping a step over a collection of items. This fan-out/fan-in pattern is one of Metaflow’s most powerful features for processing datasets in parallel.

Complete Example

Here’s the complete ForeachFlow from the test suite:
"""Foreach flow: start → process_item (×N) → join → end"""
from metaflow import FlowSpec, step


class ForeachFlow(FlowSpec):
    """A foreach flow that maps over a list of items."""

    @step
    def start(self):
        self.items = ["apple", "banana", "cherry"]
        self.next(self.process_item, foreach="items")

    @step
    def process_item(self):
        self.processed = self.input.upper()
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [i.processed for i in inputs]
        self.next(self.end)

    @step
    def end(self):
        assert sorted(self.results) == ["APPLE", "BANANA", "CHERRY"]
        print("ForeachFlow completed:", self.results)


if __name__ == "__main__":
    ForeachFlow()

How It Works

1

Start step defines the collection

The start step creates self.items = ["apple", "banana", "cherry"] and uses self.next(self.process_item, foreach="items") to spawn parallel tasks.
2

Fan-out: Process item executes for each element

Metaflow creates 3 parallel instances of process_item, one for each item in the list. Each instance:
  • Receives its item via self.input (the current item being processed)
  • Processes it independently (converts to uppercase)
  • Stores the result in self.processed
3

Fan-in: Join collects all results

The join step waits for all parallel tasks to complete, then receives an inputs iterable containing the state from each process_item execution.
4

End step validates the collected results

The end step verifies all items were processed correctly.
Dynamic Parallelism: Unlike static branching, the number of parallel tasks is determined at runtime based on the size of the collection.

The Foreach Pattern

The foreach syntax has two key components:

1. Splitting with foreach

@step
def start(self):
    self.items = ["apple", "banana", "cherry"]
    self.next(self.process_item, foreach="items")
  • foreach="items" tells Metaflow to iterate over self.items
  • Creates N parallel instances of the next step (N = length of items)

2. Accessing the current item

@step
def process_item(self):
    # self.input contains the current item
    self.processed = self.input.upper()
  • Each parallel task receives its item via self.input
  • The item is NOT in self.items - that contains the entire collection

3. Collecting results in join

@step
def join(self, inputs):
    # inputs is an iterable of all parallel task states
    self.results = [i.processed for i in inputs]
  • inputs is an iterable, not a dictionary (unlike static branch joins)
  • Iterate over inputs to collect results from each parallel task

Creating the Dagster Asset

Convert this flow to a Dagster asset:
dagster create foreach_flow.py ForeachFlow
Dagster will handle the dynamic parallelism transparently.

Generated Dagster Graph

When materialized in Dagster, the foreach flow creates a dynamic fan-out/fan-in structure:
ForeachFlow (asset)
  └─ Execution:
       start
       ├─ process_item[0] ("apple") ─┐
       ├─ process_item[1] ("banana") ─┤
       └─ process_item[2] ("cherry") ─┤
                                      join → end
The number of parallel tasks adapts to the runtime size of the collection.
Scalability: Foreach is designed for massive parallelism. You can process thousands of items, and Metaflow will handle task distribution, retries, and result aggregation automatically.

Nested Foreach

Metaflow supports nested foreach loops for multi-level parallelism. If your use case involves processing a collection where each item itself contains a sub-collection, you can chain foreach operations:
@step
def start(self):
    self.batches = [[1, 2], [3, 4], [5, 6]]
    self.next(self.process_batch, foreach="batches")

@step
def process_batch(self):
    self.batch = self.input
    self.next(self.process_item, foreach="batch")

@step
def process_item(self):
    self.result = self.input * 2
    self.next(self.join_items)

Use Cases

  • Batch data processing: Process records/files in parallel
  • Model inference: Run predictions on multiple inputs concurrently
  • Hyperparameter tuning: Evaluate multiple parameter combinations
  • Data validation: Check multiple data sources independently
  • Distributed feature engineering: Transform features across dataset splits

Key Takeaways

  • Use foreach="attribute_name" to iterate over a collection
  • Each parallel task receives its item via self.input
  • The join step receives an inputs iterable (not a dictionary)
  • Parallelism is determined at runtime based on collection size
  • Foreach scales automatically to handle large collections
  • Can be nested for multi-level parallelism