Complete Example
Here’s the completeForeachFlow from the test suite:
How It Works
Start step defines the collection
The
start step creates self.items = ["apple", "banana", "cherry"] and uses self.next(self.process_item, foreach="items") to spawn parallel tasks.Fan-out: Process item executes for each element
Metaflow creates 3 parallel instances of
process_item, one for each item in the list. Each instance:- Receives its item via
self.input(the current item being processed) - Processes it independently (converts to uppercase)
- Stores the result in
self.processed
Fan-in: Join collects all results
The
join step waits for all parallel tasks to complete, then receives an inputs iterable containing the state from each process_item execution.Dynamic Parallelism: Unlike static branching, the number of parallel tasks is determined at runtime based on the size of the collection.
The Foreach Pattern
The foreach syntax has two key components:1. Splitting with foreach
foreach="items"tells Metaflow to iterate overself.items- Creates N parallel instances of the next step (N = length of items)
2. Accessing the current item
- Each parallel task receives its item via
self.input - The item is NOT in
self.items- that contains the entire collection
3. Collecting results in join
inputsis an iterable, not a dictionary (unlike static branch joins)- Iterate over
inputsto collect results from each parallel task
Creating the Dagster Asset
Convert this flow to a Dagster asset:Generated Dagster Graph
When materialized in Dagster, the foreach flow creates a dynamic fan-out/fan-in structure:Scalability: Foreach is designed for massive parallelism. You can process thousands of items, and Metaflow will handle task distribution, retries, and result aggregation automatically.
Nested Foreach
Metaflow supports nested foreach loops for multi-level parallelism. If your use case involves processing a collection where each item itself contains a sub-collection, you can chain foreach operations:Use Cases
- Batch data processing: Process records/files in parallel
- Model inference: Run predictions on multiple inputs concurrently
- Hyperparameter tuning: Evaluate multiple parameter combinations
- Data validation: Check multiple data sources independently
- Distributed feature engineering: Transform features across dataset splits
Key Takeaways
- Use
foreach="attribute_name"to iterate over a collection - Each parallel task receives its item via
self.input - The join step receives an
inputsiterable (not a dictionary) - Parallelism is determined at runtime based on collection size
- Foreach scales automatically to handle large collections
- Can be nested for multi-level parallelism