Overview
Argo Workflows is a container-native workflow engine for Kubernetes that enables you to orchestrate parallel jobs as directed acyclic graphs (DAGs). The Metaflow integration allows you to deploy your flows as Argo WorkflowTemplates for production scheduling and event-driven execution.
Argo Workflows is the recommended production orchestrator for Metaflow on Kubernetes, providing enterprise-grade workflow orchestration with built-in scheduling, event triggering, and monitoring capabilities.
Key Capabilities
Production Scheduling Schedule workflows with cron expressions using CronWorkflows
Event Triggering Trigger workflows based on events using Argo Events integration
DAG Visualization Visualize workflow execution in the Argo Workflows UI
Workflow Management Deploy, trigger, suspend, and terminate workflows programmatically
Architecture
When you deploy a Metaflow flow to Argo Workflows:
Flow Definition : Your Metaflow flow is compiled into an Argo WorkflowTemplate
Step Mapping : Each Metaflow step becomes a DAG task in Argo
Resource Specification : @kubernetes decorator settings define pod specs
Execution : Workflows are triggered by schedule, events, or manual invocation
Monitoring : Track execution through Argo UI or Metaflow Client API
Prerequisites
Install Argo Workflows
Install Argo Workflows (v3.2+) on your Kubernetes cluster: kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/latest/download/install.yaml
Install Argo Events (Optional)
For event-driven workflows, install Argo Events: kubectl create namespace argo-events
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml
Configure Metaflow
Set required environment variables: export METAFLOW_KUBERNETES_NAMESPACE = argo
export METAFLOW_DATASTORE_SYSROOT_S3 = s3 :// my-metaflow-bucket
export METAFLOW_KUBERNETES_SERVICE_ACCOUNT = metaflow-sa
Create Service Account
Create a Kubernetes ServiceAccount with appropriate permissions: apiVersion : v1
kind : ServiceAccount
metadata :
name : metaflow-sa
namespace : argo
---
apiVersion : rbac.authorization.k8s.io/v1
kind : Role
metadata :
name : metaflow-role
namespace : argo
rules :
- apiGroups : [ "" ]
resources : [ "pods" , "pods/log" ]
verbs : [ "get" , "list" , "create" , "delete" ]
- apiGroups : [ "argoproj.io" ]
resources : [ "workflows" , "workflowtemplates" ]
verbs : [ "get" , "list" , "create" , "update" , "delete" ]
---
apiVersion : rbac.authorization.k8s.io/v1
kind : RoleBinding
metadata :
name : metaflow-rolebinding
namespace : argo
roleRef :
apiGroup : rbac.authorization.k8s.io
kind : Role
name : metaflow-role
subjects :
- kind : ServiceAccount
name : metaflow-sa
namespace : argo
Deploying Workflows
Basic Deployment
Deploy a Metaflow flow to Argo Workflows:
from metaflow import FlowSpec, step, kubernetes, schedule
class TrainingFlow ( FlowSpec ):
@step
def start ( self ):
print ( "Starting training workflow" )
self .next( self .prepare)
@kubernetes ( cpu = 2 , memory = 8000 )
@step
def prepare ( self ):
# Data preparation on Kubernetes
self .data = load_data()
self .next( self .train)
@kubernetes ( cpu = 8 , memory = 32000 , gpu = 1 )
@step
def train ( self ):
# Model training with GPU
self .model = train_model( self .data)
self .next( self .end)
@step
def end ( self ):
print ( f "Training complete. Model accuracy: { self .model.accuracy } " )
if __name__ == '__main__' :
TrainingFlow()
Deploy to Argo Workflows:
python training_flow.py argo-workflows create
This creates an Argo WorkflowTemplate that can be triggered on-demand.
Scheduled Workflows
Schedule workflows using the @schedule decorator:
from metaflow import FlowSpec, step, kubernetes, schedule
@schedule ( cron = '0 2 * * *' , timezone = 'America/New_York' )
class DailyETLFlow ( FlowSpec ):
@step
def start ( self ):
self .date = datetime.now().strftime( '%Y-%m- %d ' )
self .next( self .extract)
@kubernetes ( cpu = 4 , memory = 16000 )
@step
def extract ( self ):
self .raw_data = extract_data( self .date)
self .next( self .transform)
@kubernetes ( cpu = 8 , memory = 32000 )
@step
def transform ( self ):
self .clean_data = transform( self .raw_data)
self .next( self .load)
@kubernetes ( cpu = 2 , memory = 8000 )
@step
def load ( self ):
load_to_warehouse( self .clean_data)
self .next( self .end)
@step
def end ( self ):
print ( f "ETL completed for { self .date } " )
Deploy with scheduling:
python daily_etl_flow.py argo-workflows create
This creates both a WorkflowTemplate and a CronWorkflow that runs at 2 AM EST daily.
Event-Driven Workflows
Trigger workflows based on events using the @trigger decorator:
from metaflow import FlowSpec, step, kubernetes, trigger
@trigger ( events = [
{
'name' : 'data-arrived' ,
'parameters' : {
'dataset_id' : 'dataset_id' ,
'timestamp' : 'timestamp'
}
}
])
class EventDrivenFlow ( FlowSpec ):
dataset_id = Parameter( 'dataset_id' , required = True )
timestamp = Parameter( 'timestamp' , required = True )
@step
def start ( self ):
print ( f "Processing dataset { self .dataset_id } from { self .timestamp } " )
self .next( self .process)
@kubernetes ( cpu = 4 , memory = 16000 )
@step
def process ( self ):
# Process the incoming data
self .result = process_dataset( self .dataset_id)
self .next( self .end)
@step
def end ( self ):
print ( "Processing complete" )
Deploy with event triggering:
python event_driven_flow.py argo-workflows create
This creates a WorkflowTemplate and an Argo Events Sensor that triggers the workflow when the event arrives.
Workflow Chaining with @trigger_on_finish
Chain workflows together:
from metaflow import FlowSpec, step, kubernetes, trigger_on_finish
@trigger_on_finish ( flows = [ 'DataPreparationFlow' ])
class ModelTrainingFlow ( FlowSpec ):
@step
def start ( self ):
# This workflow automatically starts when DataPreparationFlow succeeds
from metaflow import current
# Access triggering flow information
trigger_info = current.trigger.triggers[ 0 ]
print ( f "Triggered by: { trigger_info[ 'flow' ] } " )
print ( f "Run ID: { trigger_info[ 'id' ] } " )
self .next( self .train)
@kubernetes ( cpu = 8 , memory = 32000 , gpu = 2 )
@step
def train ( self ):
# Load data from upstream flow
upstream_run = Run( f 'DataPreparationFlow/ { trigger_info[ "id" ] } ' )
data = upstream_run.data.prepared_data
# Train model
self .model = train_model(data)
self .next( self .end)
@step
def end ( self ):
print ( "Model training complete" )
Managing Deployments
List Deployed Workflows
python my_flow.py argo-workflows list
Trigger a Workflow Manually
python my_flow.py argo-workflows trigger
With parameters:
python my_flow.py argo-workflows trigger --param dataset_id=abc123 --param version= 2
Check Workflow Status
python my_flow.py argo-workflows status --run-id argo-training-flow-abc123
Delete a Deployment
python my_flow.py argo-workflows delete
Deleting a deployment removes the WorkflowTemplate, CronWorkflow, and associated Sensors. Running workflow instances are not affected.
Suspend and Resume Workflows
Suspend a running workflow:
python my_flow.py argo-workflows suspend --run-id argo-training-flow-abc123
Resume a suspended workflow:
python my_flow.py argo-workflows unsuspend --run-id argo-training-flow-abc123
Terminate a Running Workflow
python my_flow.py argo-workflows terminate --run-id argo-training-flow-abc123
Deployment Options
Workflow Timeout
Set a maximum runtime for the entire workflow:
python my_flow.py argo-workflows create --workflow-timeout 7200 # 2 hours
Workflow Priority
Set workflow priority for resource contention:
python my_flow.py argo-workflows create --workflow-priority 100
Higher priority workflows are scheduled first when the cluster has limited capacity.
Maximum Parallelism
Limit concurrent tasks:
python my_flow.py argo-workflows create --max-workers 10
This ensures no more than 10 tasks run simultaneously, useful for rate-limiting external API calls.
Notifications
Enable notifications for workflow completion:
python my_flow.py argo-workflows create \
--notify-on-error \
--notify-on-success \
--notify-slack-webhook-url https://hooks.slack.com/services/YOUR/WEBHOOK/URL
Supported notification channels:
Slack (via webhook URL)
PagerDuty (via integration key)
Incident.io (via API key)
Custom Workflow Names
Use the --name flag to customize the deployment name:
python my_flow.py argo-workflows create --name prod-training-v2
Workflow names must follow Kubernetes naming conventions: lowercase alphanumeric characters, dashes, and dots, up to 253 characters.
Argo Workflows UI
Access the Argo Workflows UI to monitor and manage your workflows:
# Port forward to access the UI
kubectl -n argo port-forward deployment/argo-server 2746:2746
Then navigate to http://localhost:2746 in your browser.
UI Features
Workflow List : View all workflow templates and running instances
DAG Visualization : See the execution graph of your flow
Live Logs : Stream logs from running tasks
Resource Usage : Monitor CPU, memory, and GPU utilization
Artifacts : View input/output artifacts (if configured)
Resubmit : Re-run workflows with the same or different parameters
Advanced Patterns
Parallel Execution with Foreach
Metaflow’s foreach branches automatically parallelize in Argo:
@step
def start ( self ):
self .datasets = [ 'dataset_1' , 'dataset_2' , 'dataset_3' ]
self .next( self .process, foreach = 'datasets' )
@kubernetes ( cpu = 4 , memory = 16000 )
@step
def process ( self ):
# Each dataset processes in parallel
self .result = process_dataset( self .input)
self .next( self .join)
@step
def join ( self , inputs ):
# Merge results from parallel branches
self .all_results = [inp.result for inp in inputs]
self .next( self .end)
Multi-Node Distributed Training
Combine @kubernetes with @parallel for distributed training:
from metaflow import kubernetes, parallel
@kubernetes ( cpu = 8 , memory = 32000 , gpu = 1 )
@parallel ( num_nodes = 4 )
@step
def distributed_training ( self ):
from metaflow import current
# Set up distributed training
rank = current.parallel.node_index
world_size = current.parallel.num_nodes
master_addr = current.parallel.main_ip
# Initialize process group
import torch.distributed as dist
dist.init_process_group(
backend = 'nccl' ,
init_method = f 'tcp:// { master_addr } :29500' ,
rank = rank,
world_size = world_size
)
# Your distributed training code
model = train_distributed(rank, world_size)
self .next( self .end)
Conditional Branching
Use conditional branches that map to Argo Workflows:
@step
def start ( self ):
self .data_size = check_data_size()
self .next( self .branch_by_size)
@step
def branch_by_size ( self ):
if self .data_size < 1000 :
self .next( self .small_process)
else :
self .next( self .large_process)
@kubernetes ( cpu = 2 , memory = 8000 )
@step
def small_process ( self ):
# Process small dataset
self .next( self .end)
@kubernetes ( cpu = 16 , memory = 64000 )
@step
def large_process ( self ):
# Process large dataset with more resources
self .next( self .end)
Monitoring and Observability
Retrieve metadata about deployments and executions:
from metaflow import Flow, Run
# Get the latest production run
run = Flow( 'TrainingFlow' ).latest_run
# Access Argo-specific metadata
for task in run[ 'train' ]:
argo_metadata = task.metadata_dict
print ( f "Workflow Name: { argo_metadata.get( 'argo-workflow-name' ) } " )
print ( f "Template: { argo_metadata.get( 'argo-workflow-template' ) } " )
print ( f "Namespace: { argo_metadata.get( 'argo-workflow-namespace' ) } " )
Custom Metrics
Export metrics from your workflows:
from metaflow import current
@kubernetes ( cpu = 4 , memory = 16000 )
@step
def train ( self ):
# Your training code
accuracy = train_model()
# Export custom metric (accessible in Argo UI)
current.card.append_metric( 'accuracy' , accuracy)
current.card.append_metric( 'training_time' , time.time() - start)
self .next( self .end)
Configure OpenTelemetry for distributed tracing:
export METAFLOW_OTEL_ENDPOINT = http :// otel-collector : 4317
Troubleshooting
Check ServiceAccount Permissions :kubectl get serviceaccount metaflow-sa -n argo
kubectl describe rolebinding metaflow-rolebinding -n argo
Verify WorkflowTemplate :kubectl get workflowtemplates -n argo
kubectl describe workflowtemplate < nam e > -n argo
Check Pod Status :kubectl get pods -n argo -l workflows.argoproj.io/workflow= < workflow-nam e >
View Pod Events :kubectl describe pod < pod-nam e > -n argo
Common Issues :
Image pull errors: Check image name and registry credentials
Resource limits: Verify node capacity
PVC mount failures: Ensure PVC exists and is bound
Scheduled Workflow Not Triggering
Check CronWorkflow :kubectl get cronworkflows -n argo
kubectl describe cronworkflow < nam e > -n argo
Verify Schedule :# Test cron expression
from croniter import croniter
from datetime import datetime
cron = croniter( '0 2 * * *' , datetime.now())
print ( f "Next run: { cron.get_next(datetime) } " )
Event-Triggered Workflow Not Working
Check Sensor Status :kubectl get sensors -n argo-events
kubectl logs -n argo-events < sensor-pod-nam e >
Verify EventSource :kubectl get eventsources -n argo-events
kubectl describe eventsource < nam e > -n argo-events
Test Event Publishing :curl -X POST http:// < webhook-ur l > \
-H "Content-Type: application/json" \
-d '{"name": "test-event", "payload": {}}'
Best Practices
Resource Right-Sizing Start with conservative resource estimates and adjust based on actual usage visible in the Argo UI.
Workflow Organization Use meaningful names and tags to organize workflows. Group related workflows using the @project decorator.
Error Handling Use @retry and @catch decorators to handle transient failures gracefully.
Cost Optimization Set appropriate timeouts with @timeout and use workflow-level limits with --workflow-timeout.
Production Checklist
Next Steps
Configuration Reference Explore all Kubernetes and Argo configuration options
Kubernetes Overview Learn more about Kubernetes execution in Metaflow
Scheduling Deep dive into scheduling strategies
Event Triggering Master event-driven workflow patterns
Additional Resources