Skip to main content

FlowSpec

Main class from which all Flows should inherit.

Usage

from metaflow import FlowSpec, step

class MyFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.end)
    
    @step
    def end(self):
        pass

Constructor

use_cli
bool
default:"True"
Set to True if the flow is invoked from main or the command line

Properties

script_name
str
Returns the name of the script containing the flow.Note: Legacy function - do not use. Use current instead.
index
int | None
The index of this foreach branch.In a foreach step, multiple instances of this step (tasks) will be executed, one for each element in the foreach. This property returns the zero based index of the current task. If this is not a foreach step, this returns None.If you need to know the indices of the parent tasks in a nested foreach, use FlowSpec.foreach_stack.
input
Any | None
The value of the foreach artifact in this foreach branch.In a foreach step, multiple instances of this step (tasks) will be executed, one for each element in the foreach. This property returns the element passed to the current task. If this is not a foreach step, this returns None.If you need to know the values of the parent tasks in a nested foreach, use FlowSpec.foreach_stack.

Methods

foreach_stack

def foreach_stack() -> Optional[List[Tuple[int, int, Any]]]
Returns the current stack of foreach indexes and values for the current step. Use this information to understand what data is being processed in the current foreach branch. For example, considering the following code:
@step
def root(self):
    self.split_1 = ['a', 'b', 'c']
    self.next(self.nest_1, foreach='split_1')

@step
def nest_1(self):
    self.split_2 = ['d', 'e', 'f', 'g']
    self.next(self.nest_2, foreach='split_2')

@step
def nest_2(self):
    foo = self.foreach_stack()
foo will take the following values in the various tasks for nest_2:
[(0, 3, 'a'), (0, 4, 'd')]
[(0, 3, 'a'), (1, 4, 'e')]
...
[(0, 3, 'a'), (3, 4, 'g')]
[(1, 3, 'b'), (0, 4, 'd')]
...
where each tuple corresponds to:
  • The index of the task for that level of the loop.
  • The number of splits for that level of the loop.
  • The value for that level of the loop.
Note that the last tuple returned in a task corresponds to:
  • 1st element: value returned by self.index.
  • 3rd element: value returned by self.input.
Returns:
  • List[Tuple[int, int, Any]] - An array describing the current stack of foreach steps.

merge_artifacts

def merge_artifacts(
    inputs: Inputs,
    exclude: Optional[List[str]] = None,
    include: Optional[List[str]] = None,
) -> None
Helper function for merging artifacts in a join step. This function takes all the artifacts coming from the branches of a join point and assigns them to self in the calling step. Only artifacts not set in the current step are considered. If, for a given artifact, different values are present on the incoming edges, an error will be thrown and the artifacts that conflict will be reported. As a few examples, in the simple graph: A splitting into B and C and joining in D:
A:
  self.x = 5
  self.y = 6
B:
  self.b_var = 1
  self.x = from_b
C:
  self.x = from_c

D:
  merge_artifacts(inputs)
In D, the following artifacts are set:
  • y (value: 6), b_var (value: 1)
  • if from_b and from_c are the same, x will be accessible and have value from_b
  • if from_b and from_c are different, an error will be thrown. To prevent this error, you need to manually set self.x in D to a merged value (for example the max) prior to calling merge_artifacts.
inputs
Inputs
required
Incoming steps to the join point.
exclude
List[str]
default:"None"
If specified, do not consider merging artifacts with a name in exclude. Cannot specify if include is also specified.
include
List[str]
default:"None"
If specified, only merge artifacts specified. Cannot specify if exclude is also specified.
Raises:
  • MetaflowException - If not called in a join step.
  • UnhandledInMergeArtifactsException - In case of unresolved conflicts.
  • MissingInMergeArtifactsException - If an artifact specified in include cannot be found.

next

def next(*dsts: Callable[..., None], **kwargs) -> None
Indicates the next step to execute after this step has completed. This statement should appear as the last statement of each step, except the end step. There are several valid formats to specify the next step: Straight-line connection:
self.next(self.next_step)
where next_step is a method in the current class decorated with the @step decorator. Static fan-out connection:
self.next(self.step1, self.step2, ...)
where stepX are methods in the current class decorated with the @step decorator. Foreach branch:
self.next(self.foreach_step, foreach='foreach_iterator')
In this situation, foreach_step is a method in the current class decorated with the @step decorator and foreach_iterator is a variable name in the current class that evaluates to an iterator. A task will be launched for each value in the iterator and each task will execute the code specified by the step foreach_step. Switch statement:
self.next({"case1": self.step_a, "case2": self.step_b}, condition='condition_variable')
In this situation, step_a and step_b are methods in the current class decorated with the @step decorator and condition_variable is a variable name in the current class. The value of the condition variable determines which step to execute. If the value doesn’t match any of the dictionary keys, a RuntimeError is raised.
dsts
Callable[..., None]
required
One or more methods annotated with @step.
Raises:
  • InvalidNextException - Raised if the format of the arguments does not match one of the valid formats.

Build docs developers (and LLMs) love