Skip to main content
Monitoring production workflows is essential for ensuring reliability and quickly identifying issues. Metaflow provides multiple tools and integrations for tracking workflow execution.

Viewing Production Runs

List Runs via CLI

Each orchestrator provides commands to list runs: AWS Step Functions:
# List all runs
python myflow.py step-functions list-runs

# List only running executions
python myflow.py step-functions list-runs --running

# List succeeded runs
python myflow.py step-functions list-runs --succeeded

# List failed runs
python myflow.py step-functions list-runs --failed
Argo Workflows:
# Check status of a specific run
python myflow.py argo-workflows status argo-xyz123

# List workflow templates
python myflow.py argo-workflows list-workflow-templates

Access Runs Programmatically

Query production runs from Python:
from metaflow import Flow, namespace

# Access production runs
namespace('production:<token>')

flow = Flow('MyFlow')

# Get latest run
latest = flow.latest_run
print(f"Latest run: {latest.pathspec}")
print(f"Status: {latest.successful}")

# Iterate through recent runs
for run in flow.runs():
    print(f"{run.id}: {run.finished_at}")
    if run.successful:
        print(f"  Result: {run.data.result}")

# Filter runs by tag
for run in flow.runs(tag='version:2.0'):
    print(run.pathspec)

Query Specific Runs

from metaflow import Flow, Run

# Access specific run by ID
run = Run('MyFlow/123')

if run.successful:
    print("Run succeeded")
    # Access artifacts
    print(f"Model accuracy: {run.data.accuracy}")
else:
    print(f"Run failed: {run.exception}")

# Iterate through steps
for step in run.steps():
    print(f"Step {step.id}: {step.finished_at}")
    for task in step.tasks():
        print(f"  Task {task.id}: {task.successful}")

UI and Dashboards

Metaflow UI

If you have the Metaflow UI service deployed:
# Runs are visible at:
http://<UI_URL>/<flow_name>/<run_id>
The UI provides:
  • DAG visualization
  • Step execution timeline
  • Artifact inspection
  • Log viewing
  • Cards rendering

Orchestrator UIs

AWS Step Functions Console:
  • Navigate to AWS Step Functions in the AWS Console
  • View state machine execution history
  • Inspect input/output for each state
  • View CloudWatch Logs (if enabled)
Argo Workflows UI:
  • Access the Argo UI at your configured endpoint
  • View workflow templates and executions
  • Real-time execution progress
  • Pod logs and events
Apache Airflow UI:
  • Access Airflow webserver
  • View DAG runs and task instances
  • Task logs and XCom data
  • Gantt charts and execution timeline

Logs and Debugging

Accessing Logs

Step Functions with CloudWatch: Enable execution history logging:
python myflow.py step-functions create --log-execution-history
Logs are sent to CloudWatch Logs under /aws/vendedlogs/states/<name>. Argo Workflows: View logs directly from kubectl:
# Get workflow pods
kubectl get pods -n <namespace> | grep <workflow-name>

# View pod logs
kubectl logs <pod-name> -n <namespace>

# Stream logs in real-time
kubectl logs -f <pod-name> -n <namespace>
Metaflow Logs CLI:
# View logs for a specific run
python myflow.py logs <run-id>

# View logs for a specific step
python myflow.py logs <run-id>/step-name

Programmatic Log Access

from metaflow import Flow

flow = Flow('MyFlow')
run = flow.latest_run

# Access logs for each task
for step in run.steps():
    for task in step.tasks():
        print(f"\n=== {task.pathspec} ===")
        print(task.stderr)  # Standard error
        print(task.stdout)  # Standard output

Alerting and Notifications

Argo Workflows Notifications

Configure notifications during deployment:
# Slack notifications
python myflow.py argo-workflows create \
    --notify-on-error \
    --notify-on-success \
    --notify-slack-webhook-url https://hooks.slack.com/services/YOUR/WEBHOOK

# PagerDuty notifications
python myflow.py argo-workflows create \
    --notify-on-error \
    --notify-pager-duty-integration-key YOUR_INTEGRATION_KEY

# Incident.io notifications
python myflow.py argo-workflows create \
    --notify-on-error \
    --notify-incident-io-api-key YOUR_API_KEY \
    --incident-io-alert-source-config-id YOUR_CONFIG_ID

Custom Alerting

Implement custom alerting in your flows:
from metaflow import FlowSpec, step, catch
import requests

class MonitoredFlow(FlowSpec):
    
    def send_alert(self, message, level='error'):
        """Send alert to monitoring system"""
        webhook_url = 'https://your-alerting-system.com/webhook'
        payload = {
            'flow': self.__class__.__name__,
            'message': message,
            'level': level
        }
        requests.post(webhook_url, json=payload)
    
    @catch(var='error')
    @step
    def start(self):
        try:
            self.data = self.process_data()
        except Exception as e:
            self.send_alert(f"Data processing failed: {str(e)}")
            raise
        self.next(self.end)
    
    @step
    def end(self):
        if hasattr(self, 'error'):
            # Handle caught exception
            self.send_alert(f"Flow completed with errors: {self.error}")
        else:
            # Success notification
            self.send_alert("Flow completed successfully", level='info')

AWS CloudWatch Alarms

Set up CloudWatch alarms for Step Functions:
import boto3

cloudwatch = boto3.client('cloudwatch')

# Create alarm for failed executions
cloudwatch.put_metric_alarm(
    AlarmName='MyFlow-FailureAlarm',
    ComparisonOperator='GreaterThanThreshold',
    EvaluationPeriods=1,
    MetricName='ExecutionsFailed',
    Namespace='AWS/States',
    Period=300,
    Statistic='Sum',
    Threshold=0,
    ActionsEnabled=True,
    AlarmActions=['arn:aws:sns:region:account:topic'],
    Dimensions=[
        {'Name': 'StateMachineArn', 'Value': 'arn:aws:states:...'}
    ]
)

Metrics and Observability

Built-in Metrics

Metaflow automatically tracks:
  • Execution duration for each step
  • Resource usage (CPU, memory)
  • Retry attempts
  • Success/failure rates

Custom Metrics

Log custom metrics from your flows:
from metaflow import FlowSpec, step, current
import time

class MetricsFlow(FlowSpec):
    
    @step
    def start(self):
        start_time = time.time()
        
        # Your processing logic
        result = self.process_data()
        
        # Calculate metrics
        duration = time.time() - start_time
        
        # Log metrics as artifacts
        self.metrics = {
            'duration_seconds': duration,
            'records_processed': len(result),
            'throughput': len(result) / duration
        }
        
        # Or send to external metrics system
        self.send_to_datadog(self.metrics)
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Integration with Monitoring Tools

Datadog:
from datadog import initialize, statsd
from metaflow import FlowSpec, step

initialize(api_key='YOUR_KEY', app_key='YOUR_APP_KEY')

class DatadogFlow(FlowSpec):
    
    @step
    def start(self):
        with statsd.timed('metaflow.flow.duration', 
                         tags=[f'flow:{self.__class__.__name__}']):
            self.result = self.process()
        
        statsd.increment('metaflow.flow.completed',
                        tags=[f'flow:{self.__class__.__name__}'])
        
        self.next(self.end)
    
    @step
    def end(self):
        pass
Prometheus:
from prometheus_client import Counter, Histogram, push_to_gateway
from metaflow import FlowSpec, step

flow_duration = Histogram('metaflow_flow_duration_seconds', 
                          'Flow execution duration')
flow_completions = Counter('metaflow_flow_completions_total',
                          'Total flow completions',
                          ['flow_name', 'status'])

class PrometheusFlow(FlowSpec):
    
    @step
    def start(self):
        with flow_duration.time():
            success = self.process()
        
        status = 'success' if success else 'failure'
        flow_completions.labels(
            flow_name=self.__class__.__name__,
            status=status
        ).inc()
        
        # Push to Prometheus Pushgateway
        push_to_gateway('localhost:9091', 
                       job='metaflow',
                       registry=...)
        
        self.next(self.end)
    
    @step  
    def end(self):
        pass

Debugging Failed Runs

Resume Failed Runs

Metaflow allows resuming from failed steps:
# Resume from the last failed step
python myflow.py resume <run-id>

# Resume from a specific step
python myflow.py resume <run-id> --step process

Inspect Failed Tasks

from metaflow import Flow

flow = Flow('MyFlow')
run = flow.latest_run

# Find failed tasks
for step in run.steps():
    for task in step.tasks():
        if not task.successful:
            print(f"\nFailed task: {task.pathspec}")
            print(f"Exception: {task.exception}")
            print(f"\nStderr:\n{task.stderr}")
            
            # Inspect artifacts from previous successful step
            if task.index > 0:
                prev_task = list(step.tasks())[task.index - 1]
                print(f"Previous artifacts: {prev_task.data}")

Debug Mode

Add debugging output to your flows:
from metaflow import FlowSpec, step, current
import sys

class DebugFlow(FlowSpec):
    
    @step
    def start(self):
        # Enable verbose logging
        print(f"[DEBUG] Run ID: {current.run_id}", file=sys.stderr)
        print(f"[DEBUG] Parameters: {current.parameter_names}", file=sys.stderr)
        print(f"[DEBUG] Origin: {current.origin_run_id}", file=sys.stderr)
        
        try:
            self.data = self.load_data()
            print(f"[DEBUG] Loaded {len(self.data)} records", file=sys.stderr)
        except Exception as e:
            print(f"[ERROR] Failed to load data: {str(e)}", file=sys.stderr)
            raise
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Performance Monitoring

Resource Usage Tracking

from metaflow import FlowSpec, step, resources
import psutil
import time

class ResourceMonitoredFlow(FlowSpec):
    
    @resources(memory=8000, cpu=4)
    @step
    def start(self):
        start_time = time.time()
        process = psutil.Process()
        
        # Record initial resource usage
        self.memory_start = process.memory_info().rss / 1024 / 1024  # MB
        self.cpu_start = process.cpu_percent()
        
        # Your processing
        self.result = self.heavy_computation()
        
        # Record final resource usage
        self.memory_end = process.memory_info().rss / 1024 / 1024
        self.cpu_end = process.cpu_percent()
        self.duration = time.time() - start_time
        
        print(f"Memory: {self.memory_start:.1f} MB -> {self.memory_end:.1f} MB")
        print(f"Duration: {self.duration:.1f}s")
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Cards for Monitoring

Use @card decorator for visual monitoring:
from metaflow import FlowSpec, step, card
from metaflow.cards import Markdown, Table, Image
import matplotlib.pyplot as plt

class MonitoringDashboard(FlowSpec):
    
    @card
    @step
    def start(self):
        # Process data
        self.metrics = self.compute_metrics()
        
        # Create visualization
        fig, ax = plt.subplots()
        ax.plot(self.metrics['timestamps'], self.metrics['values'])
        ax.set_title('Processing Metrics')
        plt.savefig('metrics.png')
        
        # Add to card
        current.card.append(Markdown("# Flow Execution Metrics"))
        current.card.append(Table([[
            'Total Records', len(self.metrics['values']),
            'Success Rate', f"{self.metrics['success_rate']:.1%}"
        ]]))
        current.card.append(Image.from_matplotlib(fig))
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Best Practices

Always deploy with tags for easier filtering and analysis:
python myflow.py step-functions create --tag version:2.1.0 --tag env:prod
Configure failure notifications before deploying to production to catch issues quickly.
Use different logging levels (INFO, WARNING, ERROR) and only log what’s necessary to avoid log bloat.
Track memory and CPU usage to optimize resource allocation and costs.
Build dashboards showing key metrics like success rates, execution times, and throughput.
Keep execution history for debugging and compliance. Configure retention policies appropriately.

Troubleshooting Common Issues

High Failure Rate

  1. Check recent code changes
  2. Review error logs for patterns
  3. Verify upstream data quality
  4. Check resource limits
  5. Look for infrastructure issues

Slow Execution

  1. Profile resource usage
  2. Check for data volume increases
  3. Look for external service latency
  4. Review parallel execution settings
  5. Consider optimizing expensive steps

Missing Artifacts

  1. Verify datastore configuration
  2. Check storage permissions
  3. Look for cleanup policies
  4. Ensure artifacts are being saved
  5. Check for datastore connectivity issues

Next Steps

Debugging Flows

Learn debugging techniques

Cards

Create visual monitoring reports

Configuration

Configure monitoring integrations

Build docs developers (and LLMs) love