Skip to main content
This tutorial walks you through Metaflow’s core features by building progressively more sophisticated workflows. By the end, you’ll understand how to build production-ready ML and data science pipelines.

Tutorial Overview

We’ll build a movie recommendation system through several episodes, each introducing new Metaflow concepts:
1

Hello World

Validate your installation with a simple flow
2

Build a Playlist

Learn parameters, data loading, and parallel execution
3

Compute Statistics

Use foreach to process data in parallel
4

Connect Flows

Access data from previous runs using the Metaflow Client
5

Scale to Cloud

Run steps on cloud compute with decorators

Episode 0: Hello World

Start by validating your Metaflow installation with a simple linear workflow.

The Code

helloworld.py
from metaflow import FlowSpec, step

class HelloFlow(FlowSpec):
    """
    A flow where Metaflow prints 'Hi'.

    Run this flow to validate that Metaflow is installed correctly.
    """

    @step
    def start(self):
        """
        This is the 'start' step. All flows must have a step named 'start' that
        is the first step in the flow.
        """
        print("HelloFlow is starting.")
        self.next(self.hello)

    @step
    def hello(self):
        """
        A step for metaflow to introduce itself.
        """
        print("Metaflow says: Hi!")
        self.next(self.end)

    @step
    def end(self):
        """
        This is the 'end' step. All flows must have an 'end' step, which is the
        last step in the flow.
        """
        print("HelloFlow is all done.")

if __name__ == "__main__":
    HelloFlow()

Run It

python helloworld.py show

What You Learned

Every Metaflow flow:
  • Inherits from FlowSpec
  • Has a start step (entry point)
  • Has an end step (exit point)
  • Uses self.next() to define the execution graph
Steps are methods decorated with @step that represent units of work. Metaflow automatically:
  • Versions each step execution
  • Captures logs and outputs
  • Handles data persistence

Episode 1: Building a Playlist

Create a movie playlist generator that loads data, accepts parameters, and executes branches in parallel.

The Code

playlist.py
from metaflow import FlowSpec, step, IncludeFile, Parameter

class PlayListFlow(FlowSpec):
    """
    A flow to help you build your favorite movie playlist.

    The flow performs the following steps:
    1) Ingests a CSV file containing metadata about movies.
    2) Loads two of the columns from the CSV into python lists.
    3) In parallel branches:
       - A) Filters movies by the genre parameter.
       - B) Choose a random movie from a different genre.
    4) Displays the top entries from the playlist.
    """

    movie_data = IncludeFile(
        "movie_data",
        help="The path to a movie metadata file.",
        default="movies.csv",
    )

    genre = Parameter(
        "genre", 
        help="Filter movies for a particular genre.", 
        default="Sci-Fi"
    )

    recommendations = Parameter(
        "recommendations",
        help="The number of movies to recommend in the playlist.",
        default=5,
    )

    @step
    def start(self):
        """
        Parse the CSV file and load the values into a dictionary of lists.
        """
        import csv

        # For this example, we only need the movie title and the genres.
        columns = ["movie_title", "genres"]
        self.dataframe = {col: [] for col in columns}

        for row in csv.DictReader(self.movie_data.splitlines()):
            for col in columns:
                self.dataframe[col].append(row[col])

        # Compute genre-specific movies and a bonus movie in parallel.
        self.next(self.bonus_movie, self.genre_movies)

    @step
    def bonus_movie(self):
        """
        This step chooses a random movie from a different genre.
        """
        from random import choice

        # Find all the movies that are not in the provided genre.
        movies = [
            (movie, genres)
            for movie, genres in zip(
                self.dataframe["movie_title"], self.dataframe["genres"]
            )
            if self.genre.lower() not in genres.lower()
        ]

        # Choose one randomly.
        self.bonus = choice(movies)
        self.next(self.join)

    @step
    def genre_movies(self):
        """
        Filter the movies by genre.
        """
        from random import shuffle

        # Find all the movies titles in the specified genre.
        self.movies = [
            movie
            for movie, genres in zip(
                self.dataframe["movie_title"], self.dataframe["genres"]
            )
            if self.genre.lower() in genres.lower()
        ]

        # Randomize the title names.
        shuffle(self.movies)
        self.next(self.join)

    @step
    def join(self, inputs):
        """
        Join our parallel branches and merge results.
        """
        # Reassign relevant variables from our branches.
        self.playlist = inputs.genre_movies.movies
        self.bonus = inputs.bonus_movie.bonus
        self.next(self.end)

    @step
    def end(self):
        """
        Print out the playlist and bonus movie.
        """
        print("Playlist for movies in genre '%s'" % self.genre)
        for pick, movie in enumerate(self.playlist, start=1):
            print("Pick %d: '%s'" % (pick, movie))
            if pick >= self.recommendations:
                break

        print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))

if __name__ == "__main__":
    PlayListFlow()

Run It

python playlist.py run

What You Learned

Use Parameter to make flows configurable:
genre = Parameter(
    "genre", 
    help="Filter movies for a particular genre.", 
    default="Sci-Fi"
)
Parameters are accessed as self.genre in steps and can be overridden via CLI.
IncludeFile embeds external files in your flow:
movie_data = IncludeFile(
    "movie_data",
    help="The path to a movie metadata file.",
    default="movies.csv",
)
Access the content with self.movie_data as a string.
Branch execution by passing multiple steps to self.next():
self.next(self.bonus_movie, self.genre_movies)
Both steps run in parallel. Metaflow handles concurrency automatically.
Merge parallel branches with a join step:
@step
def join(self, inputs):
    # Access data from each branch
    self.playlist = inputs.genre_movies.movies
    self.bonus = inputs.bonus_movie.bonus
The inputs parameter provides access to artifacts from all incoming branches.

Episode 2: Computing Statistics

Use Metaflow’s foreach construct to process multiple items in parallel.

The Code

stats.py
from metaflow import FlowSpec, step, IncludeFile

class MovieStatsFlow(FlowSpec):
    """
    A flow to generate some statistics about the movie genres.

    The flow performs the following steps:
    1) Ingests a CSV into a dataframe.
    2) Fan-out over genre using Metaflow foreach.
    3) Compute quartiles for each genre.
    4) Save a dictionary of genre-specific statistics.
    """

    movie_data = IncludeFile(
        "movie_data",
        help="The path to a movie metadata file.",
        default="movies.csv",
    )

    @step
    def start(self):
        """
        The start step:
        1) Loads the movie metadata into dataframe.
        2) Finds all the unique genres.
        3) Launches parallel statistics computation for each genre.
        """
        import csv

        # Load the dataset into a dataframe structure.
        columns = ["movie_title", "title_year", "genres", "gross"]
        self.dataframe = {col: [] for col in columns}

        for row in csv.DictReader(self.movie_data.splitlines()):
            for col in columns:
                val = int(row[col]) if col in ("title_year", "gross") else row[col]
                self.dataframe[col].append(val)

        # The column 'genres' has a list of genres for each movie. Let's get
        # all the unique genres.
        self.genres = {
            genre for genres in self.dataframe["genres"] for genre in genres.split("|")
        }
        self.genres = list(self.genres)

        # We want to compute some statistics for each genre. The 'foreach'
        # keyword argument allows us to compute the statistics for each genre in
        # parallel (i.e. a fan-out).
        self.next(self.compute_statistics, foreach="genres")

    @step
    def compute_statistics(self):
        """
        Compute statistics for a single genre.
        """
        # The genre currently being processed is a class property called 'input'.
        self.genre = self.input
        print("Computing statistics for %s" % self.genre)

        # Find all the movies that have this genre and build a dataframe with
        # just those movies and just the columns of interest.
        selector = [self.genre in row for row in self.dataframe["genres"]]

        for col in self.dataframe.keys():
            self.dataframe[col] = [
                col for col, is_genre in zip(self.dataframe[col], selector) if is_genre
            ]

        # Sort by gross box office and drop unused column.
        argsort_indices = sorted(
            range(len(self.dataframe["gross"])), 
            key=self.dataframe["gross"].__getitem__
        )
        for col in self.dataframe.keys():
            self.dataframe[col] = [self.dataframe[col][idx] for idx in argsort_indices]
        del self.dataframe["title_year"]

        # Get some statistics on the gross box office for these titles.
        n_points = len(self.dataframe["movie_title"])
        self.quartiles = []
        for cut in [0.25, 0.5, 0.75]:
            idx = 0 if n_points < 2 else round(n_points * cut)
            self.quartiles.append(self.dataframe["gross"][idx])

        # Join the results from other genres.
        self.next(self.join)

    @step
    def join(self, inputs):
        """
        Join our parallel branches and merge results into a dictionary.
        """
        # Merge results from the genre-specific computations.
        self.genre_stats = {
            inp.genre.lower(): {"quartiles": inp.quartiles, "dataframe": inp.dataframe}
            for inp in inputs
        }
        self.next(self.end)

    @step
    def end(self):
        """
        End the flow.
        """
        print("Computed statistics for %d genres" % len(self.genre_stats))

if __name__ == "__main__":
    MovieStatsFlow()

Run It

python stats.py run

What You Learned

Process multiple items in parallel using foreach:
self.genres = ["Action", "Comedy", "Drama", ...]
self.next(self.compute_statistics, foreach="genres")
Metaflow launches one parallel task per item in self.genres.
In a foreach step, access the current item via self.input:
@step
def compute_statistics(self):
    self.genre = self.input  # Current genre being processed
After foreach, a join step receives all results:
@step
def join(self, inputs):
    # inputs is an iterable of all parallel task results
    self.genre_stats = {
        inp.genre.lower(): {"quartiles": inp.quartiles}
        for inp in inputs
    }
Foreach is powerful for embarrassingly parallel workloads like:
  • Processing multiple data partitions
  • Training multiple model variants
  • Running hyperparameter sweeps
  • Processing multiple files or API endpoints

Episode 3: Connecting Flows

Use the Metaflow Client to access data from previous flow runs.

The Code

playlist_redux.py
from metaflow import FlowSpec, step, Parameter

class PlayListFlow(FlowSpec):
    """
    The next version of our playlist generator that uses the statistics
    generated from 'Episode 02' to improve the title recommendations.

    The flow performs the following steps:
    1) Load the genre-specific statistics from the MovieStatsFlow.
    2) In parallel branches:
       - A) Build a playlist from the top grossing films in the requested genre.
       - B) Choose a random movie.
    3) Join the two to create a movie playlist and display it.
    """

    genre = Parameter(
        "genre", 
        help="Filter movies for a particular genre.", 
        default="Sci-Fi"
    )

    recommendations = Parameter(
        "recommendations",
        help="The number of movies recommended for the playlist.",
        default=5,
    )

    @step
    def start(self):
        """
        Use the Metaflow client to retrieve the latest successful run from our
        MovieStatsFlow and assign them as data artifacts in this flow.
        """
        from metaflow import Flow, get_metadata

        # Print metadata provider
        print("Using metadata provider: %s" % get_metadata())

        # Load the analysis from the MovieStatsFlow.
        run = Flow("MovieStatsFlow").latest_successful_run
        print("Using analysis from '%s'" % str(run))

        self.genre_stats = run.data.genre_stats

        # Compute our two recommendation types in parallel.
        self.next(self.bonus_movie, self.genre_movies)

    @step
    def bonus_movie(self):
        """
        This step chooses a random title for a different movie genre.
        """
        import random

        # Concatenate all the genre-specific data frames.
        df = {"movie_title": [], "genres": []}
        for genre, data in self.genre_stats.items():
            if genre != self.genre.lower():
                for row_idx in range(len(data["dataframe"]["movie_title"])):
                    if (
                        self.genre.lower()
                        not in data["dataframe"]["genres"][row_idx].lower()
                    ):
                        df["movie_title"].append(
                            data["dataframe"]["movie_title"][row_idx]
                        )
                        df["genres"].append(data["dataframe"]["genres"][row_idx])

        # Choose a random movie.
        random_index = random.randint(0, len(df["genres"]) - 1)
        self.bonus = (df["movie_title"][random_index], df["genres"][random_index])
        self.next(self.join)

    @step
    def genre_movies(self):
        """
        Select the top performing movies from the user specified genre.
        """
        from random import shuffle

        # For the genre of interest, generate a potential playlist using only
        # highest gross box office titles (i.e. those in the last quartile).
        genre = self.genre.lower()
        if genre not in self.genre_stats:
            self.movies = []
        else:
            df = self.genre_stats[genre]["dataframe"]
            quartiles = self.genre_stats[genre]["quartiles"]
            self.movies = [
                df["movie_title"][i]
                for i, g in enumerate(df["gross"])
                if g >= quartiles[-1]
            ]

        # Shuffle the playlist.
        shuffle(self.movies)
        self.next(self.join)

    @step
    def join(self, inputs):
        """
        Join our parallel branches and merge results.
        """
        self.playlist = inputs.genre_movies.movies
        self.bonus = inputs.bonus_movie.bonus
        self.next(self.end)

    @step
    def end(self):
        """
        Print out the playlist and bonus movie.
        """
        print("Playlist for movies in genre '%s'" % self.genre)
        for pick, movie in enumerate(self.playlist, start=1):
            print("Pick %d: '%s'" % (pick, movie))
            if pick >= self.recommendations:
                break

        print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))

if __name__ == "__main__":
    PlayListFlow()

Run It

1

First, run the MovieStatsFlow

python stats.py run
2

Then run the PlayListFlow

python playlist_redux.py run

What You Learned

Access previous runs programmatically:
from metaflow import Flow

# Get the latest successful run of a flow
run = Flow("MovieStatsFlow").latest_successful_run

# Access artifacts from that run
genre_stats = run.data.genre_stats
Flows can depend on data from other flows, enabling:
  • Model training → Model deployment pipelines
  • Feature engineering → Model training workflows
  • ETL → Analytics pipelines
Every run is versioned and immutable. You can:
  • Access any historical run
  • Reproduce results exactly
  • Compare runs over time

Episode 4: Scaling to the Cloud

Run compute-intensive steps on cloud resources using decorators.

The Code

hello_cloud.py
from metaflow import FlowSpec, step, kubernetes, retry

class HelloCloudFlow(FlowSpec):
    """
    A flow where Metaflow prints 'Metaflow says Hi from the cloud!'

    Run this flow to validate your Kubernetes configuration.
    """

    @step
    def start(self):
        """
        The 'start' step is a regular step, so runs locally on the machine from
        which the flow is executed.
        """
        from metaflow import get_metadata

        print("HelloCloud is starting.")
        print("")
        print("Using metadata provider: %s" % get_metadata())
        print("")
        print("The start step is running locally. Next, the ")
        print("'hello' step will run remotely on Kubernetes. ")

        self.next(self.hello)

    @kubernetes(cpu=1, memory=500)
    @retry
    @step
    def hello(self):
        """
        This steps runs remotely on Kubernetes using 1 virtual CPU and 500Mb of
        memory. Since we are now using a remote metadata service and data
        store, the flow information and artifacts are available from
        anywhere. The step also uses the retry decorator, so that if something
        goes wrong, the step will be automatically retried.
        """
        self.message = "Hi from the cloud!"
        print("Metaflow says: %s" % self.message)
        self.next(self.end)

    @step
    def end(self):
        """
        The 'end' step is a regular step, so runs locally on the machine from
        which the flow is executed.
        """
        print("HelloCloud is finished.")
        print("The message from the cloud was: %s" % self.message)

if __name__ == "__main__":
    HelloCloudFlow()

Run It

python hello_cloud.py run
Prerequisites: This requires configuring Metaflow infrastructure for cloud execution. For AWS Batch, use @batch decorator; for Kubernetes use @kubernetes.

What You Learned

Specify compute resources for steps:
@kubernetes(cpu=1, memory=500)
@step
def hello(self):
    # This runs on Kubernetes with 1 CPU and 500MB RAM
    pass
Available decorators: @kubernetes, @batch (AWS), and more.
Automatically retry steps on failure:
@retry
@step
def unstable_step(self):
    # This will retry automatically if it fails
    pass
Customize with @retry(times=3) for specific retry counts.
Mix local and remote steps in the same flow:
  • Lightweight coordination steps run locally
  • Compute-intensive steps run on cloud
  • Data flows seamlessly between environments
GPU Support: Use @kubernetes(gpu=1) or @batch(gpu=1) for GPU-accelerated steps, perfect for training deep learning models.

Next Steps

Congratulations! You’ve completed the core Metaflow tutorial. You now understand:
  • Creating flows with steps
  • Working with parameters and data
  • Parallel execution and foreach patterns
  • Connecting flows with the Client API
  • Scaling to cloud compute

Production Deployment

Deploy flows to production orchestrators

Decorators Reference

Explore all available decorators

Client API

Deep dive into programmatic access

Best Practices

Learn patterns for production ML systems

Additional Resources

1

Interactive Sandbox

Try the Metaflow Sandbox for hands-on learning without setup
2

Join the Community

Get help on Slack
3

Explore Examples

Check the tutorials/ directory in the Metaflow repository for more examples

Build docs developers (and LLMs) love