Skip to main content

Overview

Every Metaflow workflow inherits from FlowSpec, which provides the core framework for defining and executing data science workflows. The FlowSpec class manages the flow lifecycle, handles data artifacts, and orchestrates step execution.

Basic Flow Structure

from metaflow import FlowSpec, step

class MyFlow(FlowSpec):
    """A simple flow demonstrating basic structure."""
    
    @step
    def start(self):
        """The entry point of the flow."""
        print("Flow started")
        self.next(self.end)
    
    @step
    def end(self):
        """The exit point of the flow."""
        print("Flow completed")

if __name__ == "__main__":
    MyFlow()

Core Components

Flow Name and Metadata

The flow name is derived from the class name:
class MyFlow(FlowSpec):
    def __init__(self, use_cli=True):
        self.name = self.__class__.__name__  # "MyFlow"
        super().__init__(use_cli)
The flow docstring becomes the flow description visible in the CLI.

The @step Decorator

Every method decorated with @step becomes an executable step in your workflow:
@step
def process_data(self):
    """Process the input data."""
    self.result = self.raw_data * 2
    self.next(self.analyze)

Step Transitions with self.next()

Every step (except end) must call self.next() to define the next step:
# Linear transition
self.next(self.next_step)

# Multiple parallel branches
self.next(self.branch_a, self.branch_b, self.branch_c)

# Foreach transition
self.next(self.process_item, foreach='items')

# Switch statement
self.next({
    'case_a': self.step_a,
    'case_b': self.step_b
}, condition='my_condition')

Data Artifacts

Any attribute set on self becomes a data artifact that persists across steps:
@step
def start(self):
    self.data = [1, 2, 3, 4, 5]  # Artifact created
    self.timestamp = datetime.now()  # Another artifact
    self.next(self.process)

@step
def process(self):
    # Artifacts are automatically available
    print(self.data)  # [1, 2, 3, 4, 5]
    self.processed = [x * 2 for x in self.data]
    self.next(self.end)

Ephemeral Attributes

Some attributes are not saved as artifacts:
# From flowspec.py:276
_EPHEMERAL = {
    '_EPHEMERAL',
    '_NON_PARAMETERS',
    '_datastore',
    '_cached_input',
    '_graph',
    '_flow_state',
    '_steps',
    'index',
    'input',
}
These are internal or transient values that don’t need persistence.

Accessing Flow Information

Flow Properties

@step
def start(self):
    # Flow name
    print(f"Running flow: {self.name}")
    
    # Current step name (when executing)
    print(f"Current step: {self._current_step}")
    
    self.next(self.end)

Script Path

@property
def script_name(self) -> str:
    """Returns the name of the script containing the flow."""
    fname = inspect.getfile(self.__class__)
    if fname.endswith('.pyc'):
        fname = fname[:-1]
    return os.path.basename(fname)

Join Steps and Inputs

When multiple branches converge, use the inputs parameter:
@step
def start(self):
    self.next(self.branch_a, self.branch_b)

@step
def branch_a(self):
    self.result_a = "A"
    self.next(self.join)

@step
def branch_b(self):
    self.result_b = "B"
    self.next(self.join)

@step
def join(self, inputs):
    """Join step receives inputs from all branches."""
    self.combined = [
        inputs.branch_a.result_a,
        inputs.branch_b.result_b
    ]
    self.next(self.end)

Flow Graph

The flow graph is automatically constructed from step definitions:
# From flowspec.py:262
class FlowSpecMeta(type):
    def _init_graph(cls):
        cls._graph = FlowGraph(cls)
        cls._steps = [getattr(cls, node.name) for node in cls._graph]
You can visualize the graph structure:
python myflow.py show

Constants and Class Variables

Class-level variables become flow constants:
class MyFlow(FlowSpec):
    # This becomes a constant available in all steps
    BATCH_SIZE = 100
    MAX_ITERATIONS = 1000
    
    @step
    def start(self):
        print(f"Batch size: {self.BATCH_SIZE}")
        self.next(self.end)
Constants are set once at flow initialization and cannot be modified. Modifications don’t propagate because each step runs in a new process.

Error Handling

Flows cannot be serialized:
# From flowspec.py:1170
def __getstate__(self):
    raise MetaflowException(
        "Flows can't be serialized. Maybe you tried "
        "to assign *self* or one of the *inputs* "
        "to an attribute? Instead of serializing the "
        "whole flow, you should choose specific "
        "attributes, e.g. *input.some_var*, to be "
        "stored."
    )
Don’t assign self or entire inputs objects to artifacts. Extract specific attributes instead.

Best Practices

  1. Clear Step Names: Use descriptive method names that explain what the step does
  2. Document Steps: Add docstrings to steps for better flow documentation
  3. Minimize Artifacts: Only save data you need in subsequent steps
  4. Type Hints: Use type hints for better code clarity
  5. Validate Early: Check data validity in early steps before expensive processing

Advanced: Flow Decorators

Flow-level decorators apply to the entire flow:
from metaflow import FlowSpec, step, schedule, project

@schedule(weekly=True)
@project(name="my-project")
class MyFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.end)
    
    @step
    def end(self):
        pass

Next Steps

Branching

Learn how to create parallel branches in your flows

Foreach

Process data in parallel with foreach loops

Parameters

Make flows configurable with Parameters

Data Management

Best practices for managing data artifacts

Build docs developers (and LLMs) love