Skip to main content
Metaflow supports multiple production orchestrators, allowing you to choose the platform that best fits your infrastructure and requirements.

AWS Step Functions

Deploy Metaflow workflows to AWS Step Functions for serverless orchestration on AWS.

Setup Requirements

  • AWS account with appropriate permissions
  • S3 bucket for datastore
  • AWS Batch configured (for compute)
  • Metaflow metadata service (optional but recommended)

Deploying to Step Functions

# Deploy flow
python myflow.py step-functions create

# Deploy with custom name
python myflow.py step-functions --name my-workflow create

# Deploy with tags
python myflow.py step-functions create --tag version:1.0 --tag env:prod

Configuration Options

--name
str
Custom name for the state machine (max 80 characters)
--max-workers
int
default:"100"
Maximum number of parallel tasks
--workflow-timeout
int
Workflow timeout in seconds
--log-execution-history
flag
Log execution history to CloudWatch Logs
--use-distributed-map
flag
Use Distributed Map for foreach tasks (supports larger fan-outs)

Managing Step Functions Deployments

# Trigger a run
python myflow.py step-functions trigger

# Trigger with parameters
python myflow.py step-functions trigger --alpha 1 --beta 2

# List runs
python myflow.py step-functions list-runs

# List only running executions
python myflow.py step-functions list-runs --running

# Terminate a run
python myflow.py step-functions terminate <run-id>

# Delete deployment
python myflow.py step-functions delete

Step Functions Example

from metaflow import FlowSpec, step, schedule, batch, resources

@schedule(daily=True)
class StepFunctionsFlow(FlowSpec):
    """Daily data processing pipeline on AWS"""
    
    @step
    def start(self):
        self.data = list(range(10))
        self.next(self.process, foreach='data')
    
    @batch(memory=4000, cpu=2)
    @resources(memory=4000, cpu=2)
    @step
    def process(self):
        self.result = self.input * 2
        self.next(self.join)
    
    @step
    def join(self, inputs):
        self.results = [inp.result for inp in inputs]
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Processed {len(self.results)} items")

if __name__ == '__main__':
    StepFunctionsFlow()

Step Functions Limitations

  • State machine names limited to 80 characters
  • Maximum execution history of 25,000 events
  • Limited to 8KB state machine definition (use --compress-state-machine for large flows)
  • Schedules run in UTC only (no timezone support)

Argo Workflows

Deploy to Kubernetes-native Argo Workflows for container orchestration.

Setup Requirements

  • Kubernetes cluster with Argo Workflows installed
  • Cloud storage (S3, Azure Blob, or GCS) for datastore
  • Kubernetes service account with appropriate permissions

Deploying to Argo Workflows

# Deploy flow
python myflow.py argo-workflows create

# Deploy with custom name
python myflow.py argo-workflows --name my-workflow create

# Deploy with priority
python myflow.py argo-workflows create --workflow-priority 100

# Deploy with notifications
python myflow.py argo-workflows create \
    --notify-on-error \
    --notify-slack-webhook-url https://hooks.slack.com/...

Configuration Options

--name
str
Custom workflow name (max 45 characters, lowercase alphanumeric)
--max-workers
int
default:"100"
Maximum number of parallel pods
--workflow-timeout
int
Workflow timeout in seconds
--workflow-priority
int
Workflow priority (higher values processed first)
--notify-on-error
flag
Send notifications on workflow failure
--notify-on-success
flag
Send notifications on workflow success
--notify-slack-webhook-url
str
Slack webhook URL for notifications
--auto-emit-argo-events
flag
default:"true"
Automatically emit Argo Events on completion

Managing Argo Workflows Deployments

# Trigger a run
python myflow.py argo-workflows trigger

# Check status
python myflow.py argo-workflows status <run-id>

# Suspend a running workflow
python myflow.py argo-workflows suspend <run-id>

# Resume a suspended workflow
python myflow.py argo-workflows unsuspend <run-id>

# Terminate a run
python myflow.py argo-workflows terminate <run-id>

# Delete deployment
python myflow.py argo-workflows delete

# List workflow templates
python myflow.py argo-workflows list-workflow-templates

Argo Workflows Example

from metaflow import FlowSpec, step, schedule, kubernetes, resources

@schedule(cron='0 */6 * * *', timezone='America/New_York')
class ArgoFlow(FlowSpec):
    """Run every 6 hours in Eastern Time"""
    
    @step
    def start(self):
        self.next(self.train)
    
    @kubernetes(cpu=4, memory=16000, image='my-ml-image:latest')
    @resources(cpu=4, memory=16000)
    @step
    def train(self):
        # Training logic with custom container
        self.model = self.train_model()
        self.next(self.end)
    
    @step
    def end(self):
        print("Training complete")

if __name__ == '__main__':
    ArgoFlow()

Argo Workflows Features

  • Timezone support for schedules
  • Custom container images via @kubernetes decorator
  • Argo Events integration for event-driven workflows
  • Native Kubernetes features (resource quotas, priority, etc.)
  • Workflow notifications to Slack, PagerDuty, or Incident.io

Apache Airflow

Generate Airflow DAGs from Metaflow flows.

Setup Requirements

  • Apache Airflow installation
  • Cloud storage (S3, Azure Blob, or GCS) for datastore
  • Kubernetes cluster (for task execution)

Creating Airflow DAGs

# Generate DAG file
python myflow.py airflow create dag.py

# Generate with custom name
python myflow.py airflow --name my-dag create dag.py

# Generate paused DAG
python myflow.py airflow create dag.py --is-paused-upon-creation

Configuration Options

--name
str
Custom DAG name
--max-workers
int
default:"100"
Maximum parallel tasks in DAG
--workflow-timeout
int
DAG timeout in seconds (for scheduled DAGs only)
--worker-pool
str
Airflow worker pool for task execution
--is-paused-upon-creation
flag
Create DAG in paused state

Airflow DAG Example

from metaflow import FlowSpec, step, schedule, Parameter, kubernetes

@schedule(cron='0 2 * * *')  # 2 AM daily
class AirflowETLFlow(FlowSpec):
    """ETL pipeline as Airflow DAG"""
    
    date = Parameter('date',
                    help='Date to process',
                    default='2024-01-01')
    
    @step
    def start(self):
        print(f"Processing data for {self.date}")
        self.next(self.extract)
    
    @kubernetes(cpu=2, memory=8000)
    @step
    def extract(self):
        self.raw_data = self.fetch_data(self.date)
        self.next(self.transform)
    
    @kubernetes(cpu=4, memory=16000)
    @step
    def transform(self):
        self.clean_data = self.process(self.raw_data)
        self.next(self.load)
    
    @step
    def load(self):
        self.save_to_warehouse(self.clean_data)
        self.next(self.end)
    
    @step
    def end(self):
        pass

if __name__ == '__main__':
    AirflowETLFlow()
Generate and deploy:
# Generate DAG
python etl_flow.py airflow create dags/etl_dag.py

# Copy to Airflow DAGs folder
cp dags/etl_dag.py $AIRFLOW_HOME/dags/

# Airflow will automatically detect and load the DAG

Airflow Limitations

  • All parameters must have default values
  • Nested foreach loops not supported
  • No @parallel decorator support
  • Requires cloud storage datastore (S3, Azure, or GCS)
  • Timezone must be configured in Airflow, not via decorator

Choosing an Orchestrator

AWS Step Functions

Best for:
  • AWS-native stacks
  • Serverless workflows
  • Simple scheduling
  • Minimal operations
Pros:
  • No infrastructure to manage
  • Tight AWS integration
  • Built-in monitoring
Cons:
  • AWS-only
  • Limited to UTC schedules
  • Quotas on execution history

Argo Workflows

Best for:
  • Kubernetes environments
  • Custom container images
  • Complex event-driven workflows
  • Multi-cloud deployments
Pros:
  • Kubernetes-native
  • Timezone support
  • Event-driven capabilities
  • Cloud-agnostic
Cons:
  • Requires K8s cluster
  • More complex setup
  • Need to manage infrastructure

Apache Airflow

Best for:
  • Existing Airflow infrastructure
  • Complex DAG dependencies
  • Rich ecosystem of operators
  • Traditional batch processing
Pros:
  • Mature ecosystem
  • Rich UI and monitoring
  • Many integrations
Cons:
  • More setup complexity
  • Some Metaflow features limited
  • Requires DAG file management

Multi-Orchestrator Flows

You can deploy the same flow to different orchestrators:
from metaflow import FlowSpec, step, schedule

@schedule(daily=True)
class MultiPlatformFlow(FlowSpec):
    
    @step
    def start(self):
        self.next(self.process)
    
    @step
    def process(self):
        self.next(self.end)
    
    @step
    def end(self):
        pass
Deploy to multiple platforms:
# Deploy to Step Functions (AWS)
python myflow.py step-functions create

# Deploy to Argo (Kubernetes)
python myflow.py argo-workflows create

# Generate Airflow DAG
python myflow.py airflow create dag.py

Next Steps

Scheduling

Learn about scheduling flows

Event Triggering

Set up event-driven workflows

Monitoring

Monitor production deployments

Configuration

Configure orchestrator settings

Build docs developers (and LLMs) love