Learn Metaflow through a comprehensive hands-on tutorial
This tutorial walks you through Metaflow’s core features by building progressively more sophisticated workflows. By the end, you’ll understand how to build production-ready ML and data science pipelines.
from metaflow import FlowSpec, stepclass HelloFlow(FlowSpec): """ A flow where Metaflow prints 'Hi'. Run this flow to validate that Metaflow is installed correctly. """ @step def start(self): """ This is the 'start' step. All flows must have a step named 'start' that is the first step in the flow. """ print("HelloFlow is starting.") self.next(self.hello) @step def hello(self): """ A step for metaflow to introduce itself. """ print("Metaflow says: Hi!") self.next(self.end) @step def end(self): """ This is the 'end' step. All flows must have an 'end' step, which is the last step in the flow. """ print("HelloFlow is all done.")if __name__ == "__main__": HelloFlow()
from metaflow import FlowSpec, step, IncludeFile, Parameterclass PlayListFlow(FlowSpec): """ A flow to help you build your favorite movie playlist. The flow performs the following steps: 1) Ingests a CSV file containing metadata about movies. 2) Loads two of the columns from the CSV into python lists. 3) In parallel branches: - A) Filters movies by the genre parameter. - B) Choose a random movie from a different genre. 4) Displays the top entries from the playlist. """ movie_data = IncludeFile( "movie_data", help="The path to a movie metadata file.", default="movies.csv", ) genre = Parameter( "genre", help="Filter movies for a particular genre.", default="Sci-Fi" ) recommendations = Parameter( "recommendations", help="The number of movies to recommend in the playlist.", default=5, ) @step def start(self): """ Parse the CSV file and load the values into a dictionary of lists. """ import csv # For this example, we only need the movie title and the genres. columns = ["movie_title", "genres"] self.dataframe = {col: [] for col in columns} for row in csv.DictReader(self.movie_data.splitlines()): for col in columns: self.dataframe[col].append(row[col]) # Compute genre-specific movies and a bonus movie in parallel. self.next(self.bonus_movie, self.genre_movies) @step def bonus_movie(self): """ This step chooses a random movie from a different genre. """ from random import choice # Find all the movies that are not in the provided genre. movies = [ (movie, genres) for movie, genres in zip( self.dataframe["movie_title"], self.dataframe["genres"] ) if self.genre.lower() not in genres.lower() ] # Choose one randomly. self.bonus = choice(movies) self.next(self.join) @step def genre_movies(self): """ Filter the movies by genre. """ from random import shuffle # Find all the movies titles in the specified genre. self.movies = [ movie for movie, genres in zip( self.dataframe["movie_title"], self.dataframe["genres"] ) if self.genre.lower() in genres.lower() ] # Randomize the title names. shuffle(self.movies) self.next(self.join) @step def join(self, inputs): """ Join our parallel branches and merge results. """ # Reassign relevant variables from our branches. self.playlist = inputs.genre_movies.movies self.bonus = inputs.bonus_movie.bonus self.next(self.end) @step def end(self): """ Print out the playlist and bonus movie. """ print("Playlist for movies in genre '%s'" % self.genre) for pick, movie in enumerate(self.playlist, start=1): print("Pick %d: '%s'" % (pick, movie)) if pick >= self.recommendations: break print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))if __name__ == "__main__": PlayListFlow()
from metaflow import FlowSpec, step, IncludeFileclass MovieStatsFlow(FlowSpec): """ A flow to generate some statistics about the movie genres. The flow performs the following steps: 1) Ingests a CSV into a dataframe. 2) Fan-out over genre using Metaflow foreach. 3) Compute quartiles for each genre. 4) Save a dictionary of genre-specific statistics. """ movie_data = IncludeFile( "movie_data", help="The path to a movie metadata file.", default="movies.csv", ) @step def start(self): """ The start step: 1) Loads the movie metadata into dataframe. 2) Finds all the unique genres. 3) Launches parallel statistics computation for each genre. """ import csv # Load the dataset into a dataframe structure. columns = ["movie_title", "title_year", "genres", "gross"] self.dataframe = {col: [] for col in columns} for row in csv.DictReader(self.movie_data.splitlines()): for col in columns: val = int(row[col]) if col in ("title_year", "gross") else row[col] self.dataframe[col].append(val) # The column 'genres' has a list of genres for each movie. Let's get # all the unique genres. self.genres = { genre for genres in self.dataframe["genres"] for genre in genres.split("|") } self.genres = list(self.genres) # We want to compute some statistics for each genre. The 'foreach' # keyword argument allows us to compute the statistics for each genre in # parallel (i.e. a fan-out). self.next(self.compute_statistics, foreach="genres") @step def compute_statistics(self): """ Compute statistics for a single genre. """ # The genre currently being processed is a class property called 'input'. self.genre = self.input print("Computing statistics for %s" % self.genre) # Find all the movies that have this genre and build a dataframe with # just those movies and just the columns of interest. selector = [self.genre in row for row in self.dataframe["genres"]] for col in self.dataframe.keys(): self.dataframe[col] = [ col for col, is_genre in zip(self.dataframe[col], selector) if is_genre ] # Sort by gross box office and drop unused column. argsort_indices = sorted( range(len(self.dataframe["gross"])), key=self.dataframe["gross"].__getitem__ ) for col in self.dataframe.keys(): self.dataframe[col] = [self.dataframe[col][idx] for idx in argsort_indices] del self.dataframe["title_year"] # Get some statistics on the gross box office for these titles. n_points = len(self.dataframe["movie_title"]) self.quartiles = [] for cut in [0.25, 0.5, 0.75]: idx = 0 if n_points < 2 else round(n_points * cut) self.quartiles.append(self.dataframe["gross"][idx]) # Join the results from other genres. self.next(self.join) @step def join(self, inputs): """ Join our parallel branches and merge results into a dictionary. """ # Merge results from the genre-specific computations. self.genre_stats = { inp.genre.lower(): {"quartiles": inp.quartiles, "dataframe": inp.dataframe} for inp in inputs } self.next(self.end) @step def end(self): """ End the flow. """ print("Computed statistics for %d genres" % len(self.genre_stats))if __name__ == "__main__": MovieStatsFlow()
Metaflow launches one parallel task per item in self.genres.
Accessing Current Item
In a foreach step, access the current item via self.input:
@stepdef compute_statistics(self): self.genre = self.input # Current genre being processed
Join After Foreach
After foreach, a join step receives all results:
@stepdef join(self, inputs): # inputs is an iterable of all parallel task results self.genre_stats = { inp.genre.lower(): {"quartiles": inp.quartiles} for inp in inputs }
Foreach is powerful for embarrassingly parallel workloads like:
from metaflow import FlowSpec, step, Parameterclass PlayListFlow(FlowSpec): """ The next version of our playlist generator that uses the statistics generated from 'Episode 02' to improve the title recommendations. The flow performs the following steps: 1) Load the genre-specific statistics from the MovieStatsFlow. 2) In parallel branches: - A) Build a playlist from the top grossing films in the requested genre. - B) Choose a random movie. 3) Join the two to create a movie playlist and display it. """ genre = Parameter( "genre", help="Filter movies for a particular genre.", default="Sci-Fi" ) recommendations = Parameter( "recommendations", help="The number of movies recommended for the playlist.", default=5, ) @step def start(self): """ Use the Metaflow client to retrieve the latest successful run from our MovieStatsFlow and assign them as data artifacts in this flow. """ from metaflow import Flow, get_metadata # Print metadata provider print("Using metadata provider: %s" % get_metadata()) # Load the analysis from the MovieStatsFlow. run = Flow("MovieStatsFlow").latest_successful_run print("Using analysis from '%s'" % str(run)) self.genre_stats = run.data.genre_stats # Compute our two recommendation types in parallel. self.next(self.bonus_movie, self.genre_movies) @step def bonus_movie(self): """ This step chooses a random title for a different movie genre. """ import random # Concatenate all the genre-specific data frames. df = {"movie_title": [], "genres": []} for genre, data in self.genre_stats.items(): if genre != self.genre.lower(): for row_idx in range(len(data["dataframe"]["movie_title"])): if ( self.genre.lower() not in data["dataframe"]["genres"][row_idx].lower() ): df["movie_title"].append( data["dataframe"]["movie_title"][row_idx] ) df["genres"].append(data["dataframe"]["genres"][row_idx]) # Choose a random movie. random_index = random.randint(0, len(df["genres"]) - 1) self.bonus = (df["movie_title"][random_index], df["genres"][random_index]) self.next(self.join) @step def genre_movies(self): """ Select the top performing movies from the user specified genre. """ from random import shuffle # For the genre of interest, generate a potential playlist using only # highest gross box office titles (i.e. those in the last quartile). genre = self.genre.lower() if genre not in self.genre_stats: self.movies = [] else: df = self.genre_stats[genre]["dataframe"] quartiles = self.genre_stats[genre]["quartiles"] self.movies = [ df["movie_title"][i] for i, g in enumerate(df["gross"]) if g >= quartiles[-1] ] # Shuffle the playlist. shuffle(self.movies) self.next(self.join) @step def join(self, inputs): """ Join our parallel branches and merge results. """ self.playlist = inputs.genre_movies.movies self.bonus = inputs.bonus_movie.bonus self.next(self.end) @step def end(self): """ Print out the playlist and bonus movie. """ print("Playlist for movies in genre '%s'" % self.genre) for pick, movie in enumerate(self.playlist, start=1): print("Pick %d: '%s'" % (pick, movie)) if pick >= self.recommendations: break print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))if __name__ == "__main__": PlayListFlow()
from metaflow import Flow# Get the latest successful run of a flowrun = Flow("MovieStatsFlow").latest_successful_run# Access artifacts from that rungenre_stats = run.data.genre_stats
Flow Dependencies
Flows can depend on data from other flows, enabling:
from metaflow import FlowSpec, step, kubernetes, retryclass HelloCloudFlow(FlowSpec): """ A flow where Metaflow prints 'Metaflow says Hi from the cloud!' Run this flow to validate your Kubernetes configuration. """ @step def start(self): """ The 'start' step is a regular step, so runs locally on the machine from which the flow is executed. """ from metaflow import get_metadata print("HelloCloud is starting.") print("") print("Using metadata provider: %s" % get_metadata()) print("") print("The start step is running locally. Next, the ") print("'hello' step will run remotely on Kubernetes. ") self.next(self.hello) @kubernetes(cpu=1, memory=500) @retry @step def hello(self): """ This steps runs remotely on Kubernetes using 1 virtual CPU and 500Mb of memory. Since we are now using a remote metadata service and data store, the flow information and artifacts are available from anywhere. The step also uses the retry decorator, so that if something goes wrong, the step will be automatically retried. """ self.message = "Hi from the cloud!" print("Metaflow says: %s" % self.message) self.next(self.end) @step def end(self): """ The 'end' step is a regular step, so runs locally on the machine from which the flow is executed. """ print("HelloCloud is finished.") print("The message from the cloud was: %s" % self.message)if __name__ == "__main__": HelloCloudFlow()
Prerequisites: This requires configuring Metaflow infrastructure for cloud execution. For AWS Batch, use @batch decorator; for Kubernetes use @kubernetes.