Overview
Every Metaflow workflow inherits from FlowSpec, which provides the core framework for defining and executing data science workflows. The FlowSpec class manages the flow lifecycle, handles data artifacts, and orchestrates step execution.
Basic Flow Structure
from metaflow import FlowSpec, step
class MyFlow ( FlowSpec ):
"""A simple flow demonstrating basic structure."""
@step
def start ( self ):
"""The entry point of the flow."""
print ( "Flow started" )
self .next( self .end)
@step
def end ( self ):
"""The exit point of the flow."""
print ( "Flow completed" )
if __name__ == "__main__" :
MyFlow()
Core Components
The flow name is derived from the class name:
class MyFlow ( FlowSpec ):
def __init__ ( self , use_cli = True ):
self .name = self . __class__ . __name__ # "MyFlow"
super (). __init__ (use_cli)
The flow docstring becomes the flow description visible in the CLI.
The @step Decorator
Every method decorated with @step becomes an executable step in your workflow:
@step
def process_data ( self ):
"""Process the input data."""
self .result = self .raw_data * 2
self .next( self .analyze)
Step Transitions with self.next()
Every step (except end) must call self.next() to define the next step:
# Linear transition
self .next( self .next_step)
# Multiple parallel branches
self .next( self .branch_a, self .branch_b, self .branch_c)
# Foreach transition
self .next( self .process_item, foreach = 'items' )
# Switch statement
self .next({
'case_a' : self .step_a,
'case_b' : self .step_b
}, condition = 'my_condition' )
Data Artifacts
Any attribute set on self becomes a data artifact that persists across steps:
@step
def start ( self ):
self .data = [ 1 , 2 , 3 , 4 , 5 ] # Artifact created
self .timestamp = datetime.now() # Another artifact
self .next( self .process)
@step
def process ( self ):
# Artifacts are automatically available
print ( self .data) # [1, 2, 3, 4, 5]
self .processed = [x * 2 for x in self .data]
self .next( self .end)
Ephemeral Attributes
Some attributes are not saved as artifacts:
# From flowspec.py:276
_EPHEMERAL = {
'_EPHEMERAL' ,
'_NON_PARAMETERS' ,
'_datastore' ,
'_cached_input' ,
'_graph' ,
'_flow_state' ,
'_steps' ,
'index' ,
'input' ,
}
These are internal or transient values that don’t need persistence.
Flow Properties
@step
def start ( self ):
# Flow name
print ( f "Running flow: { self .name } " )
# Current step name (when executing)
print ( f "Current step: { self ._current_step } " )
self .next( self .end)
Script Path
@ property
def script_name ( self ) -> str :
"""Returns the name of the script containing the flow."""
fname = inspect.getfile( self . __class__ )
if fname.endswith( '.pyc' ):
fname = fname[: - 1 ]
return os.path.basename(fname)
When multiple branches converge, use the inputs parameter:
@step
def start ( self ):
self .next( self .branch_a, self .branch_b)
@step
def branch_a ( self ):
self .result_a = "A"
self .next( self .join)
@step
def branch_b ( self ):
self .result_b = "B"
self .next( self .join)
@step
def join ( self , inputs ):
"""Join step receives inputs from all branches."""
self .combined = [
inputs.branch_a.result_a,
inputs.branch_b.result_b
]
self .next( self .end)
Flow Graph
The flow graph is automatically constructed from step definitions:
# From flowspec.py:262
class FlowSpecMeta ( type ):
def _init_graph ( cls ):
cls ._graph = FlowGraph( cls )
cls ._steps = [ getattr ( cls , node.name) for node in cls ._graph]
You can visualize the graph structure:
Constants and Class Variables
Class-level variables become flow constants:
class MyFlow ( FlowSpec ):
# This becomes a constant available in all steps
BATCH_SIZE = 100
MAX_ITERATIONS = 1000
@step
def start ( self ):
print ( f "Batch size: { self . BATCH_SIZE } " )
self .next( self .end)
Constants are set once at flow initialization and cannot be modified. Modifications don’t propagate because each step runs in a new process.
Error Handling
Flows cannot be serialized:
# From flowspec.py:1170
def __getstate__ ( self ):
raise MetaflowException(
"Flows can't be serialized. Maybe you tried "
"to assign *self* or one of the *inputs* "
"to an attribute? Instead of serializing the "
"whole flow, you should choose specific "
"attributes, e.g. *input.some_var*, to be "
"stored."
)
Don’t assign self or entire inputs objects to artifacts. Extract specific attributes instead.
Best Practices
Clear Step Names : Use descriptive method names that explain what the step does
Document Steps : Add docstrings to steps for better flow documentation
Minimize Artifacts : Only save data you need in subsequent steps
Type Hints : Use type hints for better code clarity
Validate Early : Check data validity in early steps before expensive processing
Advanced: Flow Decorators
Flow-level decorators apply to the entire flow:
from metaflow import FlowSpec, step, schedule, project
@schedule ( weekly = True )
@project ( name = "my-project" )
class MyFlow ( FlowSpec ):
@step
def start ( self ):
self .next( self .end)
@step
def end ( self ):
pass
Next Steps
Branching Learn how to create parallel branches in your flows
Foreach Process data in parallel with foreach loops
Parameters Make flows configurable with Parameters
Data Management Best practices for managing data artifacts