Skip to main content

Overview

Metaflow provides first-class support for Jupyter notebooks, allowing you to:
  • Develop and test flows interactively
  • Run individual steps in notebook cells
  • Deploy notebook-defined flows to production
  • Access results from previous runs
  • Visualize data with Cards

Defining Flows in Notebooks

You can define complete flows in notebook cells:
from metaflow import FlowSpec, step, Parameter

class NotebookFlow(FlowSpec):
    
    alpha = Parameter('alpha', default=0.01)
    
    @step
    def start(self):
        print(f"Starting with alpha={self.alpha}")
        self.data = list(range(100))
        self.next(self.process)
    
    @step
    def process(self):
        self.result = sum(self.data) * self.alpha
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Result: {self.result}")

if __name__ == '__main__':
    NotebookFlow()

Running Flows from Notebooks

Using NBRunner

The recommended way to run notebook flows is with NBRunner:
from metaflow import NBRunner

# Run the flow
with NBRunner(NotebookFlow).run(alpha=0.05) as running:
    running.wait()
    print(f"Status: {running.status}")
    print(f"Run ID: {running.run.id}")

Accessing Results

from metaflow import NBRunner

# Run and access results
with NBRunner(NotebookFlow).run() as running:
    running.wait()
    
    # Access the run
    run = running.run
    
    # Get artifacts from the end step
    result = run['end'].task.data.result
    print(f"Computed result: {result}")

Interactive Development

Testing Individual Steps

You can test steps interactively:
# Create a flow instance
flow = NotebookFlow()

# Simulate starting the flow
flow.alpha = 0.01
flow.start()

# Check the data
print(f"Generated {len(flow.data)} data points")

# Test the processing step
flow.process()
print(f"Result: {flow.result}")

Prototyping with Small Data

from metaflow import FlowSpec, step
import pandas as pd

class DataPipeline(FlowSpec):
    
    @step
    def start(self):
        # Load a small sample for testing
        self.df = pd.read_csv('data.csv', nrows=1000)
        print(f"Loaded {len(self.df)} rows")
        self.next(self.process)
    
    @step
    def process(self):
        # Process the sample
        self.result = self.df.groupby('category').sum()
        self.next(self.end)
    
    @step
    def end(self):
        print(self.result)

# Test with sample data
with NBRunner(DataPipeline).run() as running:
    running.wait()

Deploying Notebook Flows

Deploy to Production with NBDeployer

from metaflow import NBDeployer

# Deploy to Argo Workflows
deployment = NBDeployer(NotebookFlow).argo_workflows(
    name='notebook-flow-prod',
    schedule='@daily'
)

print(f"Deployed: {deployment.name}")

# Trigger manually
triggered = deployment.trigger(alpha=0.02)
print(f"Triggered: {triggered.pathspec}")

Deploy to AWS Step Functions

from metaflow import NBDeployer

# Deploy to Step Functions
deployment = NBDeployer(NotebookFlow).step_functions(
    name='notebook-flow-prod',
    schedule='cron(0 9 * * ? *)'
)

print(f"Deployed to Step Functions: {deployment.name}")

Accessing Previous Runs

Using the Client API

from metaflow import Flow

# Access the latest run
run = Flow('NotebookFlow').latest_run

print(f"Run ID: {run.id}")
print(f"Created: {run.finished_at}")

# Get artifacts
for step in run:
    print(f"\nStep: {step.id}")
    for artifact_name in step.task.data:
        if not artifact_name.startswith('_'):
            artifact = getattr(step.task.data, artifact_name)
            print(f"  {artifact_name}: {artifact}")

Comparing Runs

from metaflow import Flow
import pandas as pd

# Compare results from multiple runs
flow = Flow('NotebookFlow')

results = []
for run in flow.runs():
    result = run['end'].task.data.result
    results.append({
        'run_id': run.id,
        'alpha': run['start'].task.data.alpha,
        'result': result
    })

df = pd.DataFrame(results)
print(df)

Visualization with Cards

Adding Cards to Notebook Flows

from metaflow import FlowSpec, step, card, current
from metaflow.cards import Markdown, Table, Image
import matplotlib.pyplot as plt

class VisualFlow(FlowSpec):
    
    @card
    @step
    def start(self):
        # Create a plot
        plt.figure(figsize=(10, 6))
        plt.plot([1, 2, 3, 4], [1, 4, 9, 16])
        plt.title('Sample Plot')
        plt.savefig('plot.png')
        
        # Add to card
        current.card.append(Markdown("# Analysis Results"))
        current.card.append(Image.from_matplotlib(plt))
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

# Run and view card
with NBRunner(VisualFlow).run() as running:
    running.wait()
    # Card is automatically saved

Viewing Cards

from metaflow import Flow

# Get the latest run
run = Flow('VisualFlow').latest_run

# View cards in notebook
from IPython.display import HTML
for step in run:
    if hasattr(step.task, 'cards'):
        for card in step.task.cards:
            display(HTML(card.html))

Notebook Best Practices

Always use NBRunner instead of command-line execution in notebooks:
# Good - programmatic control
with NBRunner(MyFlow).run() as running:
    running.wait()

# Avoid - shell magic
!python myflow.py run
Start with small datasets to iterate quickly:
@step
def start(self):
    if current.is_production:
        self.data = load_full_dataset()
    else:
        # Use sample for testing
        self.data = load_sample(nrows=1000)
Define the entire flow in a single cell for easier maintenance:
# Single cell with complete flow
from metaflow import FlowSpec, step

class MyFlow(FlowSpec):
    # All steps here
    pass
Add @card decorator to steps that create visualizations:
@card
@step
def analyze(self):
    # Create plots
    # They'll be automatically saved to cards
    pass
Once your flow is stable, export it to a .py file:
# In notebook: develop and test
# Then export to myflow.py for production

# Deploy the .py file
from metaflow import Deployer
Deployer('myflow.py').argo_workflows()

Common Patterns

Experiment Tracking

from metaflow import FlowSpec, step, Parameter
import mlflow

class MLExperiment(FlowSpec):
    
    learning_rate = Parameter('lr', default=0.01)
    epochs = Parameter('epochs', default=10)
    
    @step
    def start(self):
        # Log parameters
        self.params = {
            'lr': self.learning_rate,
            'epochs': self.epochs
        }
        self.next(self.train)
    
    @step
    def train(self):
        # Training code
        self.accuracy = 0.95  # Example
        self.loss = 0.05
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Accuracy: {self.accuracy}")

# Run multiple experiments
for lr in [0.01, 0.001, 0.0001]:
    with NBRunner(MLExperiment).run(lr=lr) as running:
        running.wait()
        print(f"lr={lr}, status={running.status}")

Data Exploration Pipeline

from metaflow import FlowSpec, step
import pandas as pd
import seaborn as sns

class DataExploration(FlowSpec):
    
    @step
    def start(self):
        self.df = pd.read_csv('data.csv')
        self.next(self.profile, self.correlations)
    
    @step
    def profile(self):
        self.summary = self.df.describe()
        self.next(self.join)
    
    @step
    def correlations(self):
        self.corr = self.df.corr()
        self.next(self.join)
    
    @step
    def join(self, inputs):
        self.summary = inputs.profile.summary
        self.corr = inputs.correlations.corr
        self.next(self.end)
    
    @step
    def end(self):
        print("Analysis complete")

# Run exploration
with NBRunner(DataExploration).run() as running:
    running.wait()

Runner API

Learn about the Runner and NBRunner APIs

Cards & Visualization

Create rich visual reports

Client API

Access flow results programmatically

Quickstart

Get started with Metaflow

Build docs developers (and LLMs) love