Skip to main content

Overview

The Deployer class provides a programmatic interface to configure and deploy Metaflow flows to production orchestrators such as AWS Step Functions, Argo Workflows, and others.

Usage

The Deployer class allows you to access different production orchestrators supported by Metaflow.
from metaflow import Deployer

# Deploy to Argo Workflows
deployer = Deployer('myflow.py')
argo = deployer.argo_workflows()
deployed_flow = argo.create()

# Trigger a run
triggered_run = deployed_flow.trigger(alpha=5)
print(triggered_run.run)

Constructor

Deployer(flow_file, show_output=True, profile=None, env=None, cwd=None, file_read_timeout=3600, **kwargs)

Create a new Deployer instance for deploying a flow. Parameters:
flow_file
str
required
Path to the flow file to deploy, relative to the current directory.
show_output
bool
default:"True"
Show the stdout and stderr to the console by default.
profile
str
default:"None"
Metaflow profile to use for the deployment. If not specified, the default profile is used.
env
Dict[str, str]
default:"None"
Additional environment variables to set for the deployment. This overrides the environment set for this process.
cwd
str
default:"None"
The directory to run the subprocess in. If not specified, the current directory is used.
file_read_timeout
int
default:"3600"
The timeout in seconds until which we try to read the deployer attribute file.
**kwargs
Any
Additional arguments that you would pass to python myflow.py before the deployment command.

Orchestrator Methods

The Deployer class dynamically provides methods for different production orchestrators. The available methods depend on the Metaflow plugins installed.

Common Orchestrators

argo_workflows(**deployer_kwargs) Create a deployer for Argo Workflows. Returns: An orchestrator-specific deployer object that can be used to create, delete, and manage deployments. Example:
deployer = Deployer('myflow.py')
argo = deployer.argo_workflows(name='my-workflow')
deployed_flow = argo.create()
step_functions(**deployer_kwargs) Create a deployer for AWS Step Functions. Returns: An orchestrator-specific deployer object that can be used to create, delete, and manage deployments. Example:
deployer = Deployer('myflow.py')
sf = deployer.step_functions(name='my-state-machine')
deployed_flow = sf.create()

DeployedFlow

The DeployedFlow class represents a flow that has been deployed to a production orchestrator. It is returned by the create() method of orchestrator-specific deployers.

Class Methods

from_deployment(identifier, metadata=None, impl=None) Retrieve a DeployedFlow object from an identifier and optional metadata. Parameters:
  • identifier (str) - Deployer-specific identifier for the workflow to retrieve
  • metadata (str, optional) - Optional deployer-specific metadata
  • impl (str, optional) - The deployer implementation to use (e.g., argo_workflows). Defaults to METAFLOW_DEFAULT_FROM_DEPLOYMENT_IMPL
Returns: A DeployedFlow object Example:
from metaflow import DeployedFlow

deployed = DeployedFlow.from_deployment(
    'my-workflow-name',
    impl='argo_workflows'
)
list_deployed_flows(flow_name=None, impl=None) List all deployed flows for the specified implementation. Parameters:
  • flow_name (str, optional) - If specified, only list deployed flows for this specific flow name
  • impl (str, optional) - The deployer implementation to use. Defaults to METAFLOW_DEFAULT_FROM_DEPLOYMENT_IMPL
Yields: DeployedFlow objects representing deployed flows Example:
from metaflow import DeployedFlow

for flow in DeployedFlow.list_deployed_flows(impl='argo_workflows'):
    print(flow.name)
get_triggered_run(identifier, run_id, metadata=None, impl=None) Retrieve a TriggeredRun object from an identifier, a run ID, and optional metadata. Parameters:
  • identifier (str) - Deployer-specific identifier for the workflow
  • run_id (str) - Run ID for which to fetch the triggered run object
  • metadata (str, optional) - Optional deployer-specific metadata
  • impl (str, optional) - The deployer implementation to use. Defaults to METAFLOW_DEFAULT_FROM_DEPLOYMENT_IMPL
Returns: A TriggeredRun object

Properties

name - str The name of the deployed workflow. flow_name - str The name of the Metaflow flow. metadata - str Deployer-specific metadata for the deployment.

Methods

trigger(**kwargs) Trigger a new run of the deployed flow. Parameters:
  • **kwargs - Parameters to pass to the flow
Returns: A TriggeredRun object Example:
deployed_flow = DeployedFlow.from_deployment('my-workflow')
triggered_run = deployed_flow.trigger(alpha=5, beta=10)
delete() Delete the deployed workflow from the orchestrator. Example:
deployed_flow.delete()

TriggeredRun

The TriggeredRun class represents a run that has been triggered on a production orchestrator.

Properties

run - Optional[metaflow.Run] Retrieve the Metaflow Run object for the triggered run. Note that the Run becomes available only when the start task has started executing. Returns None if the start step hasn’t started yet. name - str The name of the triggered run. pathspec - str The pathspec of the triggered run (e.g., “MyFlow/123”). metadata_for_flow - dict Metadata associated with the flow.

Methods

wait_for_run(check_interval=5, timeout=None) Wait for the run property to become available. The run property becomes available only after the start task of the triggered flow starts running. Parameters:
  • check_interval (int, default 5) - Frequency of checking for the run to become available, in seconds
  • timeout (int, optional) - Maximum time to wait for the run to become available, in seconds. If None, wait indefinitely
Raises:
  • TimeoutError - If the run is not available within the specified timeout
Example:
triggered_run = deployed_flow.trigger(alpha=5)
triggered_run.wait_for_run(timeout=300)
print(triggered_run.run.finished)
terminate() Terminate the triggered run. Example:
triggered_run = deployed_flow.trigger(alpha=5)
# ... later ...
triggered_run.terminate()

Build docs developers (and LLMs) love