Skip to main content

Overview

Metaflow provides native integration with Apache Airflow, allowing you to deploy your flows as Airflow DAGs that execute on Kubernetes. This integration enables you to leverage Airflow’s scheduling capabilities while maintaining Metaflow’s developer-friendly API.
Airflow integration requires Airflow 2.2.0 or later and uses the KubernetesPodOperator for task execution.

Prerequisites

Before deploying to Airflow, ensure you have:
  • Apache Airflow 2.2.0+ installed
  • Kubernetes cluster configured
  • Apache Airflow Kubernetes provider: apache-airflow-providers-cncf-kubernetes
  • Metaflow configured with a supported datastore (S3, Azure Blob Storage, or GCS)
  • @kubernetes decorator available for all steps

Quick Start

1. Prepare Your Flow

Create a Metaflow flow with the @kubernetes decorator on steps that need cloud execution:
from metaflow import FlowSpec, step, kubernetes, schedule

class MyAirflowFlow(FlowSpec):
    
    @schedule(daily=True)
    @step
    def start(self):
        print("Starting workflow")
        self.next(self.process)
    
    @kubernetes(cpu=2, memory=4096)
    @step
    def process(self):
        # Heavy computation
        self.result = sum(range(1000000))
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Result: {self.result}")

if __name__ == '__main__':
    MyAirflowFlow()

2. Deploy to Airflow

Compile your flow to an Airflow DAG file:
python my_flow.py airflow create my_airflow_dag.py
This generates a Python file containing the Airflow DAG definition.

3. Deploy the DAG

Copy the generated DAG file to your Airflow DAGs directory:
cp my_airflow_dag.py $AIRFLOW_HOME/dags/
Airflow will automatically detect and load the new DAG.

Configuration Options

Command Line Options

The airflow create command supports several options:
python my_flow.py airflow create my_dag.py \
  --name my_custom_dag_name \
  --max-workers 100 \
  --workflow-timeout 3600 \
  --worker-pool my_pool \
  --tag production \
  --tag v1.0 \
  --is-paused-upon-creation
OptionDescriptionDefault
--nameCustom DAG name (uses flow name if not specified)Flow name
--max-workersMaximum number of parallel tasks100
--workflow-timeoutWorkflow timeout in seconds (enforced only for scheduled DAGs)None
--worker-poolAirflow worker pool for task executionNone
--tagTag to annotate all objects (can be specified multiple times)None
--is-paused-upon-creationCreate DAG in paused stateFalse
--namespaceCustom namespace for runsDefault

Production Tokens

Metaflow uses production tokens to manage deployment authorization:
# First deployment generates a token
python my_flow.py airflow create my_dag.py

# Redeploy with existing token (automatically loaded)
python my_flow.py airflow create my_dag.py

# Generate new token (creates new namespace)
python my_flow.py airflow create my_dag.py --generate-new-token

# Use specific token
python my_flow.py airflow create my_dag.py --new-token my_token

# Authorize with existing token
python my_flow.py airflow create my_dag.py --authorize my_token

Flow Decorators

@schedule Decorator

Schedule your flow using Airflow’s scheduling:
from metaflow import FlowSpec, step, schedule

@schedule(cron='0 10 * * *')  # Daily at 10:00 AM
class ScheduledFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.end)
    
    @step
    def end(self):
        pass
Supported schedule types:
  • cron='0 10 * * *' - Cron expression
  • daily=True - Daily execution
  • weekly=True - Weekly execution
  • hourly=True - Hourly execution
The timezone parameter is not supported in Airflow deployments.

Sensors

Metaflow supports Airflow sensors as flow decorators to wait for external conditions:

S3 Key Sensor

Wait for S3 objects before starting the flow:
from metaflow import FlowSpec, step, airflow_s3_key_sensor

@airflow_s3_key_sensor(
    bucket_key='s3://my-bucket/data/*.csv',
    wildcard_match=True,
    timeout=3600,
    poke_interval=60
)
class S3SensorFlow(FlowSpec):
    @step
    def start(self):
        # Starts after S3 key is available
        self.next(self.end)
    
    @step
    def end(self):
        pass
Parameters:
  • bucket_key (required): S3 key(s) to wait for (supports wildcards)
  • bucket_name: S3 bucket name (optional if full s3:// URL provided)
  • wildcard_match: Interpret bucket_key as Unix wildcard pattern (default: False)
  • timeout: Timeout in seconds (default: 3600)
  • poke_interval: Check interval in seconds (default: 60)
  • mode: Sensor mode - ‘poke’ or ‘reschedule’ (default: ‘poke’)
  • aws_conn_id: Airflow AWS connection ID

External Task Sensor

Wait for another Airflow DAG to complete:
from metaflow import FlowSpec, step, airflow_external_task_sensor
from datetime import timedelta

@airflow_external_task_sensor(
    external_dag_id='upstream_dag',
    external_task_ids=['task1', 'task2'],
    allowed_states=['success'],
    execution_delta=timedelta(hours=1)
)
class ExternalTaskFlow(FlowSpec):
    @step
    def start(self):
        # Starts after external tasks complete
        self.next(self.end)
    
    @step
    def end(self):
        pass
Parameters:
  • external_dag_id (required): DAG ID to wait for
  • external_task_ids: List of task IDs (waits for DAG if None)
  • allowed_states: List of acceptable states (default: [‘success’])
  • failed_states: List of failed states
  • execution_delta: Time difference with previous execution
  • check_existence: Verify external task/DAG exists (default: True)
  • timeout: Timeout in seconds (default: 3600)
  • poke_interval: Check interval in seconds (default: 60)
Multiple sensors can be added to a flow. The start step will execute only after all sensors complete.

Step Decorators

@kubernetes Decorator

All steps are executed using the Kubernetes decorator:
from metaflow import kubernetes

@kubernetes(
    cpu=4,
    memory=8192,
    gpu=1,
    gpu_vendor='nvidia',
    image='my-registry/my-image:latest',
    namespace='metaflow',
    service_account='metaflow-sa',
    secrets=['my-secret-1', 'my-secret-2'],
    node_selector={'node.type': 'gpu'}
)
@step
def train_model(self):
    # Training code
    pass

@retry Decorator

Configure retry behavior:
from metaflow import retry

@retry(times=3, minutes_between_retries=5)
@step
def flaky_step(self):
    pass

@timeout Decorator

Set execution timeouts:
from metaflow import timeout

@timeout(seconds=3600)
@step
def long_running_step(self):
    pass

Parameters

All parameters must have default values for Airflow deployment:
from metaflow import FlowSpec, Parameter, step

class ParameterizedFlow(FlowSpec):
    learning_rate = Parameter('learning_rate',
                              default=0.01,
                              type=float,
                              help='Learning rate')
    
    epochs = Parameter('epochs',
                       default=10,
                       type=int,
                       help='Number of epochs')
    
    @step
    def start(self):
        print(f"Training with lr={self.learning_rate}, epochs={self.epochs}")
        self.next(self.end)
    
    @step
    def end(self):
        pass
Parameters can be overridden in the Airflow UI when triggering a DAG run.

Foreach Loops

Metaflow foreach loops are supported via Airflow’s dynamic task mapping:
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = list(range(10))
        self.next(self.process, foreach='items')
    
    @kubernetes(cpu=2)
    @step
    def process(self):
        self.result = self.input * 2
        self.next(self.join)
    
    @step
    def join(self, inputs):
        self.results = [inp.result for inp in inputs]
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Results: {self.results}")
Foreach Limitations:
  • Nested foreach loops are not supported
  • Foreach must have an immediate join step (no linear steps between foreach and join)
  • Parallel foreach (@parallel decorator) is not supported
  • Requires Airflow 2.3.0+ and Kubernetes provider 4.2.0+

Limitations

The following Metaflow features are not supported with Airflow:

Unsupported Decorators

  • @batch - AWS Batch execution
  • @slurm - Slurm cluster execution
  • @parallel - Parallel foreach execution
  • @trigger / @trigger_on_finish - Event triggering
  • @exit_hook - Exit hooks

Unsupported Kubernetes Options

  • use_tmpfs - tmpfs volume mounting
  • tmpfs_size - tmpfs size configuration
  • persistent_volume_claims - PVC mounting
  • image_pull_policy - Image pull policy

Datastore Requirements

Only the following datastores are supported:
  • AWS S3 (s3://)
  • Azure Blob Storage (azure://)
  • Google Cloud Storage (gs://)

Environment Configuration

Configure Metaflow for Airflow deployment:

Kubernetes Connection

# Use in-cluster configuration (when Airflow runs in Kubernetes)
export METAFLOW_AIRFLOW_KUBERNETES_CONN_ID=None

# Use specific kubeconfig context
export METAFLOW_AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT=my-context

# Use specific kubeconfig file
export METAFLOW_AIRFLOW_KUBERNETES_KUBECONFIG_FILE=/path/to/kubeconfig

# Set startup timeout for pods
export METAFLOW_AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS=600

Datastore Configuration

# S3 configuration
export METAFLOW_DATASTORE_SYSROOT_S3=s3://my-bucket/metaflow
export METAFLOW_DATATOOLS_S3ROOT=s3://my-bucket/datatools

# Azure configuration  
export METAFLOW_DATASTORE_SYSROOT_AZURE=my-container/metaflow
export METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT=https://myaccount.blob.core.windows.net

# GCS configuration
export METAFLOW_DATASTORE_SYSROOT_GS=gs://my-bucket/metaflow

Kubernetes Secrets

# Global secrets available to all steps
export METAFLOW_KUBERNETES_SECRETS=secret1,secret2

# Service account for pod execution
export METAFLOW_KUBERNETES_SERVICE_ACCOUNT=metaflow-sa

Monitoring and Debugging

Viewing Logs

View task logs in the Airflow UI:
  1. Navigate to your DAG
  2. Click on a task instance
  3. View logs in the Logs tab

Accessing Results

Access run results using the Metaflow Client API:
from metaflow import Flow, namespace

# Set namespace to production token
namespace('production:my-token')

# Access the flow
flow = Flow('MyAirflowFlow')

# Get latest run
run = flow.latest_run

# Access artifacts
for step in run:
    print(f"{step.id}: {step.finished_at}")
    if hasattr(step, 'result'):
        print(f"  Result: {step.task.data.result}")

Common Issues

DAG not appearing in Airflow UI:
  • Check that the DAG file is in the correct directory
  • Verify no Python syntax errors
  • Check Airflow scheduler logs
Tasks failing to start:
  • Verify Kubernetes connection configuration
  • Check service account permissions
  • Ensure Docker image is accessible
Parameter issues:
  • Ensure all parameters have default values
  • Check parameter types are supported (int, str, float, bool)

Best Practices

  1. Use Production Tokens: Always use production tokens to manage deployments and organize results by deployment version.
  2. Test Locally First: Run your flow locally before deploying to Airflow:
    python my_flow.py run
    
  3. Resource Management: Set appropriate CPU and memory limits for each step to optimize cluster utilization.
  4. Idempotency: Ensure your steps are idempotent for reliable retry behavior.
  5. Monitoring: Use Airflow’s built-in monitoring and alerting features to track flow execution.
  6. Version Control: Keep DAG files in version control and use tags to track deployments.

Example: Complete Production Flow

from metaflow import FlowSpec, step, Parameter, schedule
from metaflow import kubernetes, retry, timeout, airflow_s3_key_sensor
from datetime import timedelta

@airflow_s3_key_sensor(
    bucket_key='s3://data-bucket/input/*.parquet',
    wildcard_match=True,
    timeout=7200
)
@schedule(cron='0 2 * * *')  # Daily at 2 AM
class ProductionMLFlow(FlowSpec):
    
    model_type = Parameter('model_type',
                          default='xgboost',
                          help='Model type to train')
    
    @step
    def start(self):
        """Initialize workflow"""
        self.run_date = datetime.now().strftime('%Y-%m-%d')
        self.next(self.load_data)
    
    @kubernetes(cpu=4, memory=16384)
    @retry(times=3, minutes_between_retries=5)
    @timeout(seconds=3600)
    @step
    def load_data(self):
        """Load and preprocess data"""
        # Load data from S3
        self.data = load_data(self.run_date)
        self.splits = list(range(10))
        self.next(self.train_model, foreach='splits')
    
    @kubernetes(cpu=8, memory=32768, gpu=1, gpu_vendor='nvidia')
    @retry(times=2)
    @timeout(seconds=7200)
    @step
    def train_model(self):
        """Train model on data split"""
        split_id = self.input
        self.model = train(self.data, split_id, self.model_type)
        self.metrics = evaluate(self.model)
        self.next(self.join_models)
    
    @step
    def join_models(self, inputs):
        """Select best model"""
        self.all_metrics = [inp.metrics for inp in inputs]
        best_idx = max(range(len(inputs)), 
                      key=lambda i: inputs[i].metrics['accuracy'])
        self.best_model = inputs[best_idx].model
        self.next(self.deploy)
    
    @kubernetes(cpu=2, memory=8192)
    @step
    def deploy(self):
        """Deploy best model"""
        deploy_model(self.best_model, self.run_date)
        self.next(self.end)
    
    @step
    def end(self):
        """Workflow complete"""
        print(f"Deployed model for {self.run_date}")

if __name__ == '__main__':
    ProductionMLFlow()
Deploy this flow:
python production_ml_flow.py airflow create production_ml_dag.py \
  --max-workers 50 \
  --workflow-timeout 14400 \
  --tag production \
  --tag ml-pipeline

Additional Resources

Build docs developers (and LLMs) love