Skip to main content
The DagsterDeployedFlow class represents a Metaflow flow that has been deployed as a Dagster definitions file. It provides methods to trigger new runs and manage the deployment.

Properties

id
str
Deployment identifier encoding all info needed for from_deployment.Returns deployer.name directly: the create command embeds the full JSON blob (name, flow_name, flow_file, definitions_file) so the identifier is self-contained and survives a process restart.
name
str
Human-readable job name extracted from the deployment identifier.This is parsed from the JSON identifier blob during initialization for display purposes.

Methods

run

Trigger a new run of this deployed flow.
run(**kwargs) -> DagsterTriggeredRun
**kwargs
Any
Flow parameters as keyword arguments (e.g. message="hello").
Returns: DagsterTriggeredRun - A triggered run object that can be used to monitor the run status.

Example

from metaflow import Runner

# Deploy the flow
with Runner("my_flow.py").deployer("dagster") as deployer:
    deployed_flow = deployer.create()

# Trigger a run with parameters
triggered_run = deployed_flow.run(message="Hello from Metaflow")

print(f"Run pathspec: {triggered_run.pathspec}")
print(f"Status: {triggered_run.status}")
print(f"Dagster UI: {triggered_run.dagster_ui}")

trigger

Alias for run(). Kept for backwards compatibility.
trigger(**kwargs) -> DagsterTriggeredRun
**kwargs
Any
Flow parameters as keyword arguments.
Returns: DagsterTriggeredRun

from_deployment

Recover a DagsterDeployedFlow from a deployment identifier.
@classmethod
from_deployment(identifier: str, metadata: str | None = None) -> DagsterDeployedFlow
identifier
str
required
The JSON string returned by deployed_flow.deployer.name or deployed_flow.id, which contains the full deployment info (flow_file, name, flow_name, definitions_file).
metadata
str | None
Optional metadata to associate with the recovered deployment.
Returns: DagsterDeployedFlow - A deployed flow instance reconstructed from the identifier. Raises: ValueError if the identifier is invalid or missing required fields.

Example

from metaflow import Runner

# Deploy and save the identifier
with Runner("my_flow.py").deployer("dagster") as deployer:
    deployed_flow = deployer.create()
    deployment_id = deployed_flow.id

# Later, recover the deployment from the identifier
recovered_flow = DagsterDeployedFlow.from_deployment(deployment_id)

# Trigger a run using the recovered deployment
triggered_run = recovered_flow.run(param1="value1")

Relationship with Deployer

The DagsterDeployedFlow is returned by the Dagster deployer’s create() method:
from metaflow import Runner

# Create a deployment
with Runner("my_flow.py").deployer("dagster") as deployer:
    deployed_flow = deployer.create()
    
    # Access deployment info
    print(f"Flow name: {deployed_flow.name}")
    print(f"Deployment ID: {deployed_flow.id}")
    
    # Trigger runs
    run1 = deployed_flow.run(param="value1")
    run2 = deployed_flow.run(param="value2")
The deployment identifier returned by deployed_flow.id contains all necessary information to reconstruct the deployment later using from_deployment(), making it easy to persist and restore deployments across process restarts.