Skip to main content
Steps are the building blocks of Metaflow flows. Each step is a Python method decorated with @step that performs a specific task.

The @step Decorator

The @step decorator marks a method as a Metaflow step:
from metaflow import FlowSpec, step

class MyFlow(FlowSpec):
    @step
    def start(self):
        self.data = [1, 2, 3, 4, 5]
        self.next(self.process)
    
    @step
    def process(self):
        self.result = sum(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Total: {self.result}")
The @step decorator must be placed immediately before the method definition. Other decorators should be placed above @step:
@batch
@step
def my_step(self):
    pass

Step Execution

When a step executes:
  1. Load artifacts from parent step(s)
  2. Execute the step function
  3. Save artifacts created in the step
  4. Transition to next step(s) via self.next()

Defining Transitions

Every step (except the final end step) must call self.next() to specify what happens next:

Linear Transition

@step
def start(self):
    self.next(self.process)

Static Fan-out

@step
def start(self):
    # Execute multiple steps in parallel
    self.next(self.branch_a, self.branch_b, self.branch_c)

Foreach (Dynamic Fan-out)

@step
def start(self):
    self.params = [{'lr': 0.01}, {'lr': 0.001}, {'lr': 0.0001}]
    self.next(self.train, foreach='params')

@step
def train(self):
    # Runs once per parameter set
    config = self.input  # Current parameter set
    # Train model with config...
    self.next(self.join)

Switch/Conditional

@step
def start(self):
    self.mode = 'train'  # or 'test'
    self.next(
        {"train": self.train_model, "test": self.test_model},
        condition='mode'
    )

Join Steps

Join steps merge data from parallel branches:
@step
def join(self, inputs):
    # inputs is a list of parent tasks
    
    # Access individual inputs
    for inp in inputs:
        print(inp.result)
    
    # Merge artifacts automatically
    self.merge_artifacts(inputs)
    
    # Or manually merge
    self.all_results = [inp.result for inp in inputs]
    
    self.next(self.end)
Use self.merge_artifacts(inputs) to automatically merge artifacts from parallel branches. It handles conflicts intelligently.

Step Context

Within a step, you have access to:

self.index

The index in a foreach loop:
@step
def process(self):
    print(f"Processing item {self.index}")  # 0, 1, 2, ...
    self.next(self.join)

self.input

The current foreach value:
@step
def process(self):
    value = self.input  # Current item from foreach iterator
    self.result = value * 2
    self.next(self.join)

self.foreach_stack()

Nested foreach information:
@step
def nested_step(self):
    stack = self.foreach_stack()
    # [(index1, num_splits1, value1), (index2, num_splits2, value2), ...]
    self.next(self.join)

Data Artifacts

Any attribute you set on self becomes a data artifact:
@step
def start(self):
    self.model = train_model()      # Saved as artifact
    self.accuracy = 0.95            # Saved as artifact
    self.config = {'lr': 0.01}      # Saved as artifact
    self.next(self.end)
Attributes starting with _ are private and not saved as artifacts:
self._temp = "not saved"  # Private, ephemeral
self.result = "saved"      # Public artifact

Step Decorators

Steps can be enhanced with decorators:
from metaflow import FlowSpec, step, batch, retry, timeout

class MyFlow(FlowSpec):
    
    @batch(cpu=8, memory=32000)
    @step
    def train_model(self):
        # Runs on AWS Batch with 8 CPUs and 32GB RAM
        pass
    
    @retry(times=3)
    @step  
    def flaky_step(self):
        # Automatically retries up to 3 times on failure
        pass
    
    @timeout(hours=2)
    @step
    def long_running(self):
        # Fails if step runs longer than 2 hours
        pass
See Decorators for the full list.

Parallel Execution

Use num_parallel to run multiple copies of a step:
@step
def start(self):
    # Run 10 parallel instances
    self.next(self.parallel_work, num_parallel=10)

@step
def parallel_work(self):
    # Each instance gets a unique self.index (0-9)
    self.result = expensive_computation(self.index)
    self.next(self.join)

Best Practices

Make steps idempotent: Steps should produce the same result when run multiple times with the same input. This makes debugging and retries reliable.
Keep steps focused: Each step should do one thing well. Split complex operations into multiple steps.
Use descriptive names: Step names appear in logs, the UI, and data artifacts. Choose names that clearly describe what the step does.

Build docs developers (and LLMs) love