Skip to main content

Overview

Argo Workflows is a container-native workflow engine for Kubernetes that enables you to orchestrate parallel jobs as directed acyclic graphs (DAGs). The Metaflow integration allows you to deploy your flows as Argo WorkflowTemplates for production scheduling and event-driven execution.
Argo Workflows is the recommended production orchestrator for Metaflow on Kubernetes, providing enterprise-grade workflow orchestration with built-in scheduling, event triggering, and monitoring capabilities.

Key Capabilities

Production Scheduling

Schedule workflows with cron expressions using CronWorkflows

Event Triggering

Trigger workflows based on events using Argo Events integration

DAG Visualization

Visualize workflow execution in the Argo Workflows UI

Workflow Management

Deploy, trigger, suspend, and terminate workflows programmatically

Architecture

When you deploy a Metaflow flow to Argo Workflows:
  1. Flow Definition: Your Metaflow flow is compiled into an Argo WorkflowTemplate
  2. Step Mapping: Each Metaflow step becomes a DAG task in Argo
  3. Resource Specification: @kubernetes decorator settings define pod specs
  4. Execution: Workflows are triggered by schedule, events, or manual invocation
  5. Monitoring: Track execution through Argo UI or Metaflow Client API

Prerequisites

1

Install Argo Workflows

Install Argo Workflows (v3.2+) on your Kubernetes cluster:
kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/latest/download/install.yaml
2

Install Argo Events (Optional)

For event-driven workflows, install Argo Events:
kubectl create namespace argo-events
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml
3

Configure Metaflow

Set required environment variables:
export METAFLOW_KUBERNETES_NAMESPACE=argo
export METAFLOW_DATASTORE_SYSROOT_S3=s3://my-metaflow-bucket
export METAFLOW_KUBERNETES_SERVICE_ACCOUNT=metaflow-sa
4

Create Service Account

Create a Kubernetes ServiceAccount with appropriate permissions:
apiVersion: v1
kind: ServiceAccount
metadata:
  name: metaflow-sa
  namespace: argo
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: metaflow-role
  namespace: argo
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log"]
  verbs: ["get", "list", "create", "delete"]
- apiGroups: ["argoproj.io"]
  resources: ["workflows", "workflowtemplates"]
  verbs: ["get", "list", "create", "update", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: metaflow-rolebinding
  namespace: argo
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: metaflow-role
subjects:
- kind: ServiceAccount
  name: metaflow-sa
  namespace: argo

Deploying Workflows

Basic Deployment

Deploy a Metaflow flow to Argo Workflows:
from metaflow import FlowSpec, step, kubernetes, schedule

class TrainingFlow(FlowSpec):
    
    @step
    def start(self):
        print("Starting training workflow")
        self.next(self.prepare)
    
    @kubernetes(cpu=2, memory=8000)
    @step
    def prepare(self):
        # Data preparation on Kubernetes
        self.data = load_data()
        self.next(self.train)
    
    @kubernetes(cpu=8, memory=32000, gpu=1)
    @step
    def train(self):
        # Model training with GPU
        self.model = train_model(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Training complete. Model accuracy: {self.model.accuracy}")

if __name__ == '__main__':
    TrainingFlow()
Deploy to Argo Workflows:
python training_flow.py argo-workflows create
This creates an Argo WorkflowTemplate that can be triggered on-demand.

Scheduled Workflows

Schedule workflows using the @schedule decorator:
from metaflow import FlowSpec, step, kubernetes, schedule

@schedule(cron='0 2 * * *', timezone='America/New_York')
class DailyETLFlow(FlowSpec):
    
    @step
    def start(self):
        self.date = datetime.now().strftime('%Y-%m-%d')
        self.next(self.extract)
    
    @kubernetes(cpu=4, memory=16000)
    @step
    def extract(self):
        self.raw_data = extract_data(self.date)
        self.next(self.transform)
    
    @kubernetes(cpu=8, memory=32000)
    @step
    def transform(self):
        self.clean_data = transform(self.raw_data)
        self.next(self.load)
    
    @kubernetes(cpu=2, memory=8000)
    @step
    def load(self):
        load_to_warehouse(self.clean_data)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"ETL completed for {self.date}")
Deploy with scheduling:
python daily_etl_flow.py argo-workflows create
This creates both a WorkflowTemplate and a CronWorkflow that runs at 2 AM EST daily.

Event-Driven Workflows

Trigger workflows based on events using the @trigger decorator:
from metaflow import FlowSpec, step, kubernetes, trigger

@trigger(events=[
    {
        'name': 'data-arrived',
        'parameters': {
            'dataset_id': 'dataset_id',
            'timestamp': 'timestamp'
        }
    }
])
class EventDrivenFlow(FlowSpec):
    
    dataset_id = Parameter('dataset_id', required=True)
    timestamp = Parameter('timestamp', required=True)
    
    @step
    def start(self):
        print(f"Processing dataset {self.dataset_id} from {self.timestamp}")
        self.next(self.process)
    
    @kubernetes(cpu=4, memory=16000)
    @step
    def process(self):
        # Process the incoming data
        self.result = process_dataset(self.dataset_id)
        self.next(self.end)
    
    @step
    def end(self):
        print("Processing complete")
Deploy with event triggering:
python event_driven_flow.py argo-workflows create
This creates a WorkflowTemplate and an Argo Events Sensor that triggers the workflow when the event arrives.

Workflow Chaining with @trigger_on_finish

Chain workflows together:
from metaflow import FlowSpec, step, kubernetes, trigger_on_finish

@trigger_on_finish(flows=['DataPreparationFlow'])
class ModelTrainingFlow(FlowSpec):
    
    @step
    def start(self):
        # This workflow automatically starts when DataPreparationFlow succeeds
        from metaflow import current
        
        # Access triggering flow information
        trigger_info = current.trigger.triggers[0]
        print(f"Triggered by: {trigger_info['flow']}")
        print(f"Run ID: {trigger_info['id']}")
        
        self.next(self.train)
    
    @kubernetes(cpu=8, memory=32000, gpu=2)
    @step
    def train(self):
        # Load data from upstream flow
        upstream_run = Run(f'DataPreparationFlow/{trigger_info["id"]}')
        data = upstream_run.data.prepared_data
        
        # Train model
        self.model = train_model(data)
        self.next(self.end)
    
    @step
    def end(self):
        print("Model training complete")

Managing Deployments

List Deployed Workflows

python my_flow.py argo-workflows list

Trigger a Workflow Manually

python my_flow.py argo-workflows trigger
With parameters:
python my_flow.py argo-workflows trigger --param dataset_id=abc123 --param version=2

Check Workflow Status

python my_flow.py argo-workflows status --run-id argo-training-flow-abc123

Delete a Deployment

python my_flow.py argo-workflows delete
Deleting a deployment removes the WorkflowTemplate, CronWorkflow, and associated Sensors. Running workflow instances are not affected.

Suspend and Resume Workflows

Suspend a running workflow:
python my_flow.py argo-workflows suspend --run-id argo-training-flow-abc123
Resume a suspended workflow:
python my_flow.py argo-workflows unsuspend --run-id argo-training-flow-abc123

Terminate a Running Workflow

python my_flow.py argo-workflows terminate --run-id argo-training-flow-abc123

Deployment Options

Workflow Timeout

Set a maximum runtime for the entire workflow:
python my_flow.py argo-workflows create --workflow-timeout 7200  # 2 hours

Workflow Priority

Set workflow priority for resource contention:
python my_flow.py argo-workflows create --workflow-priority 100
Higher priority workflows are scheduled first when the cluster has limited capacity.

Maximum Parallelism

Limit concurrent tasks:
python my_flow.py argo-workflows create --max-workers 10
This ensures no more than 10 tasks run simultaneously, useful for rate-limiting external API calls.

Notifications

Enable notifications for workflow completion:
python my_flow.py argo-workflows create \
  --notify-on-error \
  --notify-on-success \
  --notify-slack-webhook-url https://hooks.slack.com/services/YOUR/WEBHOOK/URL
Supported notification channels:
  • Slack (via webhook URL)
  • PagerDuty (via integration key)
  • Incident.io (via API key)

Custom Workflow Names

Use the --name flag to customize the deployment name:
python my_flow.py argo-workflows create --name prod-training-v2
Workflow names must follow Kubernetes naming conventions: lowercase alphanumeric characters, dashes, and dots, up to 253 characters.

Argo Workflows UI

Access the Argo Workflows UI to monitor and manage your workflows:
# Port forward to access the UI
kubectl -n argo port-forward deployment/argo-server 2746:2746
Then navigate to http://localhost:2746 in your browser.

UI Features

  • Workflow List: View all workflow templates and running instances
  • DAG Visualization: See the execution graph of your flow
  • Live Logs: Stream logs from running tasks
  • Resource Usage: Monitor CPU, memory, and GPU utilization
  • Artifacts: View input/output artifacts (if configured)
  • Resubmit: Re-run workflows with the same or different parameters

Advanced Patterns

Parallel Execution with Foreach

Metaflow’s foreach branches automatically parallelize in Argo:
@step
def start(self):
    self.datasets = ['dataset_1', 'dataset_2', 'dataset_3']
    self.next(self.process, foreach='datasets')

@kubernetes(cpu=4, memory=16000)
@step
def process(self):
    # Each dataset processes in parallel
    self.result = process_dataset(self.input)
    self.next(self.join)

@step
def join(self, inputs):
    # Merge results from parallel branches
    self.all_results = [inp.result for inp in inputs]
    self.next(self.end)

Multi-Node Distributed Training

Combine @kubernetes with @parallel for distributed training:
from metaflow import kubernetes, parallel

@kubernetes(cpu=8, memory=32000, gpu=1)
@parallel(num_nodes=4)
@step
def distributed_training(self):
    from metaflow import current
    
    # Set up distributed training
    rank = current.parallel.node_index
    world_size = current.parallel.num_nodes
    master_addr = current.parallel.main_ip
    
    # Initialize process group
    import torch.distributed as dist
    dist.init_process_group(
        backend='nccl',
        init_method=f'tcp://{master_addr}:29500',
        rank=rank,
        world_size=world_size
    )
    
    # Your distributed training code
    model = train_distributed(rank, world_size)
    
    self.next(self.end)

Conditional Branching

Use conditional branches that map to Argo Workflows:
@step
def start(self):
    self.data_size = check_data_size()
    self.next(self.branch_by_size)

@step
def branch_by_size(self):
    if self.data_size < 1000:
        self.next(self.small_process)
    else:
        self.next(self.large_process)

@kubernetes(cpu=2, memory=8000)
@step
def small_process(self):
    # Process small dataset
    self.next(self.end)

@kubernetes(cpu=16, memory=64000)
@step
def large_process(self):
    # Process large dataset with more resources
    self.next(self.end)

Monitoring and Observability

Access Workflow Metadata

Retrieve metadata about deployments and executions:
from metaflow import Flow, Run

# Get the latest production run
run = Flow('TrainingFlow').latest_run

# Access Argo-specific metadata
for task in run['train']:
    argo_metadata = task.metadata_dict
    print(f"Workflow Name: {argo_metadata.get('argo-workflow-name')}")
    print(f"Template: {argo_metadata.get('argo-workflow-template')}")
    print(f"Namespace: {argo_metadata.get('argo-workflow-namespace')}")

Custom Metrics

Export metrics from your workflows:
from metaflow import current

@kubernetes(cpu=4, memory=16000)
@step
def train(self):
    # Your training code
    accuracy = train_model()
    
    # Export custom metric (accessible in Argo UI)
    current.card.append_metric('accuracy', accuracy)
    current.card.append_metric('training_time', time.time() - start)
    
    self.next(self.end)

Integration with Observability Tools

Configure OpenTelemetry for distributed tracing:
export METAFLOW_OTEL_ENDPOINT=http://otel-collector:4317

Troubleshooting

Check ServiceAccount Permissions:
kubectl get serviceaccount metaflow-sa -n argo
kubectl describe rolebinding metaflow-rolebinding -n argo
Verify WorkflowTemplate:
kubectl get workflowtemplates -n argo
kubectl describe workflowtemplate <name> -n argo
Check Pod Status:
kubectl get pods -n argo -l workflows.argoproj.io/workflow=<workflow-name>
View Pod Events:
kubectl describe pod <pod-name> -n argo
Common Issues:
  • Image pull errors: Check image name and registry credentials
  • Resource limits: Verify node capacity
  • PVC mount failures: Ensure PVC exists and is bound
Check CronWorkflow:
kubectl get cronworkflows -n argo
kubectl describe cronworkflow <name> -n argo
Verify Schedule:
# Test cron expression
from croniter import croniter
from datetime import datetime

cron = croniter('0 2 * * *', datetime.now())
print(f"Next run: {cron.get_next(datetime)}")
Check Sensor Status:
kubectl get sensors -n argo-events
kubectl logs -n argo-events <sensor-pod-name>
Verify EventSource:
kubectl get eventsources -n argo-events
kubectl describe eventsource <name> -n argo-events
Test Event Publishing:
curl -X POST http://<webhook-url> \
  -H "Content-Type: application/json" \
  -d '{"name": "test-event", "payload": {}}'

Best Practices

Resource Right-Sizing

Start with conservative resource estimates and adjust based on actual usage visible in the Argo UI.

Workflow Organization

Use meaningful names and tags to organize workflows. Group related workflows using the @project decorator.

Error Handling

Use @retry and @catch decorators to handle transient failures gracefully.

Cost Optimization

Set appropriate timeouts with @timeout and use workflow-level limits with --workflow-timeout.

Production Checklist

1

Security

  • Configure RBAC with least-privilege principles
  • Use Kubernetes secrets for sensitive data
  • Enable pod security policies
  • Set up network policies for namespace isolation
2

Reliability

  • Configure retries for transient failures
  • Set appropriate timeouts
  • Implement health checks
  • Test failure scenarios
3

Monitoring

  • Enable workflow notifications
  • Set up log aggregation
  • Configure metrics collection
  • Create alerting rules
4

Resource Management

  • Set resource quotas per namespace
  • Configure cluster autoscaling
  • Implement pod disruption budgets
  • Use priority classes

Next Steps

Configuration Reference

Explore all Kubernetes and Argo configuration options

Kubernetes Overview

Learn more about Kubernetes execution in Metaflow

Scheduling

Deep dive into scheduling strategies

Event Triggering

Master event-driven workflow patterns

Additional Resources

Build docs developers (and LLMs) love