from metaflow import Runner# Run a flowwith Runner('myflow.py').run() as running: print(f"Run started: {running.run.pathspec}") running.wait() print(f"Run finished: {running.status}")
from metaflow import Deployer# Deploy to Step Functionsdeployment = Deployer('myflow.py').step_functions( name='my-production-flow', schedule='cron(0 8 * * ? *)')# List all deploymentsfor flow in deployment.list_deployed_flows(): print(f"{flow.name}: {flow.flow_name}")
from metaflow import NBRunner# Run a flow from a notebookwith NBRunner('MyFlow').run() as running: running.wait() print(f"Status: {running.status}") print(f"Run ID: {running.run.id}")
from metaflow import Runnertry: with Runner('myflow.py').run() as running: running.wait() if running.status != 'completed': print(f"Flow failed with status: {running.status}") print(f"Return code: {running.returncode}") print(f"Error output:\n{running.stderr}")except Exception as e: print(f"Runner error: {e}")
Always use with statements when running flows to ensure proper cleanup:
# Goodwith Runner('flow.py').run() as running: running.wait()# Avoidrunning = Runner('flow.py').run()running.wait()running.cleanup() # Easy to forget
Handle errors gracefully
Always check the run status and handle failures:
with Runner('flow.py').run() as running: running.wait() if running.status == 'completed': print("Success!") else: print(f"Failed: {running.stderr}") # Take corrective action
Stream logs for long-running flows
For long-running flows, stream logs to monitor progress:
with Runner('flow.py').run() as running: for line in running.stream_log(): print(line)
Use async for concurrent execution
Use async_run() when running multiple flows concurrently:
async def run_all(): runners = [Runner(f'flow{i}.py').async_run() for i in range(10)] results = await asyncio.gather(*runners) return [r.status for r in results]