Skip to main content

Overview

The Runner API allows you to execute and deploy Metaflow flows programmatically from Python code. This is useful for:
  • Building custom workflow orchestration systems
  • Creating web applications that trigger flows
  • Integrating Metaflow into existing Python applications
  • Automating flow execution and deployment
  • Running flows from Jupyter notebooks
The Runner API requires Python 3.7 or later.

Core Classes

Runner

Execute flows synchronously or asynchronously

Deployer

Deploy flows to production orchestrators

NBRunner

Run flows defined in Jupyter notebooks

NBDeployer

Deploy notebook flows to production

Runner Class

The Runner class executes flows from Python scripts.

Basic Usage

from metaflow import Runner

# Run a flow
with Runner('myflow.py').run() as running:
    print(f"Run started: {running.run.pathspec}")
    running.wait()
    print(f"Run finished: {running.status}")

Async Execution

import asyncio
from metaflow import Runner

async def run_multiple_flows():
    # Start multiple flows concurrently
    runners = [
        Runner('flow1.py').async_run(),
        Runner('flow2.py').async_run(),
        Runner('flow3.py').async_run()
    ]
    
    # Wait for all to complete
    results = await asyncio.gather(*runners)
    
    for result in results:
        await result.wait()
        print(f"{result.run.pathspec}: {result.status}")

asyncio.run(run_multiple_flows())

Streaming Logs

from metaflow import Runner

with Runner('myflow.py').run() as running:
    # Stream logs in real-time
    for line in running.stream_log():
        print(line)

Deployer Class

The Deployer class deploys flows to production orchestrators.

Deploy to Argo Workflows

from metaflow import Deployer

# Deploy to Argo Workflows
deployment = Deployer('myflow.py').argo_workflows(
    name='my-production-flow',
    schedule='@daily'
)

print(f"Deployed: {deployment.name}")

# Trigger a run
triggered = deployment.trigger()
print(f"Triggered run: {triggered.pathspec}")

# Wait for completion
triggered.wait_for_run()
print(f"Run status: {triggered.run.successful}")

Deploy to AWS Step Functions

from metaflow import Deployer

# Deploy to Step Functions
deployment = Deployer('myflow.py').step_functions(
    name='my-production-flow',
    schedule='cron(0 8 * * ? *)'
)

# List all deployments
for flow in deployment.list_deployed_flows():
    print(f"{flow.name}: {flow.flow_name}")

NBRunner Class

The NBRunner class executes flows defined in Jupyter notebooks.

Running Notebook Flows

from metaflow import NBRunner

# Run a flow from a notebook
with NBRunner('MyFlow').run() as running:
    running.wait()
    print(f"Status: {running.status}")
    print(f"Run ID: {running.run.id}")

Using in Notebooks

# In a Jupyter notebook
from metaflow import FlowSpec, step, NBRunner

class MyFlow(FlowSpec):
    @step
    def start(self):
        print("Starting...")
        self.next(self.end)
    
    @step
    def end(self):
        print("Done!")

# Run the flow
with NBRunner(MyFlow).run() as running:
    running.wait()

Advanced Patterns

Parameterized Execution

from metaflow import Runner

# Pass parameters to the flow
with Runner('myflow.py').run(
    alpha=0.01,
    batch_size=128
) as running:
    running.wait()

Error Handling

from metaflow import Runner

try:
    with Runner('myflow.py').run() as running:
        running.wait()
        
        if running.status != 'completed':
            print(f"Flow failed with status: {running.status}")
            print(f"Return code: {running.returncode}")
            print(f"Error output:\n{running.stderr}")
except Exception as e:
    print(f"Runner error: {e}")

Custom Environment

from metaflow import Runner
import os

# Run with custom environment variables
env = os.environ.copy()
env['MY_API_KEY'] = 'secret-key'
env['METAFLOW_PROFILE'] = 'production'

with Runner('myflow.py', env=env).run() as running:
    running.wait()

Best Practices

Always use with statements when running flows to ensure proper cleanup:
# Good
with Runner('flow.py').run() as running:
    running.wait()

# Avoid
running = Runner('flow.py').run()
running.wait()
running.cleanup()  # Easy to forget
Always check the run status and handle failures:
with Runner('flow.py').run() as running:
    running.wait()
    
    if running.status == 'completed':
        print("Success!")
    else:
        print(f"Failed: {running.stderr}")
        # Take corrective action
For long-running flows, stream logs to monitor progress:
with Runner('flow.py').run() as running:
    for line in running.stream_log():
        print(line)
Use async_run() when running multiple flows concurrently:
async def run_all():
    runners = [Runner(f'flow{i}.py').async_run() for i in range(10)]
    results = await asyncio.gather(*runners)
    return [r.status for r in results]

Runner API Reference

Complete Runner class documentation

Deployer API Reference

Complete Deployer class documentation

Production Deployment

Learn about deploying to production

Notebooks

Using Metaflow in Jupyter notebooks

Build docs developers (and LLMs) love