Overview
Metaflow provides native integration with Apache Airflow, allowing you to deploy your flows as Airflow DAGs that execute on Kubernetes. This integration enables you to leverage Airflow’s scheduling capabilities while maintaining Metaflow’s developer-friendly API.
Airflow integration requires Airflow 2.2.0 or later and uses the KubernetesPodOperator for task execution.
Prerequisites
Before deploying to Airflow, ensure you have:
- Apache Airflow 2.2.0+ installed
- Kubernetes cluster configured
- Apache Airflow Kubernetes provider:
apache-airflow-providers-cncf-kubernetes
- Metaflow configured with a supported datastore (S3, Azure Blob Storage, or GCS)
@kubernetes decorator available for all steps
Quick Start
1. Prepare Your Flow
Create a Metaflow flow with the @kubernetes decorator on steps that need cloud execution:
from metaflow import FlowSpec, step, kubernetes, schedule
class MyAirflowFlow(FlowSpec):
@schedule(daily=True)
@step
def start(self):
print("Starting workflow")
self.next(self.process)
@kubernetes(cpu=2, memory=4096)
@step
def process(self):
# Heavy computation
self.result = sum(range(1000000))
self.next(self.end)
@step
def end(self):
print(f"Result: {self.result}")
if __name__ == '__main__':
MyAirflowFlow()
2. Deploy to Airflow
Compile your flow to an Airflow DAG file:
python my_flow.py airflow create my_airflow_dag.py
This generates a Python file containing the Airflow DAG definition.
3. Deploy the DAG
Copy the generated DAG file to your Airflow DAGs directory:
cp my_airflow_dag.py $AIRFLOW_HOME/dags/
Airflow will automatically detect and load the new DAG.
Configuration Options
Command Line Options
The airflow create command supports several options:
python my_flow.py airflow create my_dag.py \
--name my_custom_dag_name \
--max-workers 100 \
--workflow-timeout 3600 \
--worker-pool my_pool \
--tag production \
--tag v1.0 \
--is-paused-upon-creation
| Option | Description | Default |
|---|
--name | Custom DAG name (uses flow name if not specified) | Flow name |
--max-workers | Maximum number of parallel tasks | 100 |
--workflow-timeout | Workflow timeout in seconds (enforced only for scheduled DAGs) | None |
--worker-pool | Airflow worker pool for task execution | None |
--tag | Tag to annotate all objects (can be specified multiple times) | None |
--is-paused-upon-creation | Create DAG in paused state | False |
--namespace | Custom namespace for runs | Default |
Production Tokens
Metaflow uses production tokens to manage deployment authorization:
# First deployment generates a token
python my_flow.py airflow create my_dag.py
# Redeploy with existing token (automatically loaded)
python my_flow.py airflow create my_dag.py
# Generate new token (creates new namespace)
python my_flow.py airflow create my_dag.py --generate-new-token
# Use specific token
python my_flow.py airflow create my_dag.py --new-token my_token
# Authorize with existing token
python my_flow.py airflow create my_dag.py --authorize my_token
Flow Decorators
@schedule Decorator
Schedule your flow using Airflow’s scheduling:
from metaflow import FlowSpec, step, schedule
@schedule(cron='0 10 * * *') # Daily at 10:00 AM
class ScheduledFlow(FlowSpec):
@step
def start(self):
self.next(self.end)
@step
def end(self):
pass
Supported schedule types:
cron='0 10 * * *' - Cron expression
daily=True - Daily execution
weekly=True - Weekly execution
hourly=True - Hourly execution
The timezone parameter is not supported in Airflow deployments.
Sensors
Metaflow supports Airflow sensors as flow decorators to wait for external conditions:
S3 Key Sensor
Wait for S3 objects before starting the flow:
from metaflow import FlowSpec, step, airflow_s3_key_sensor
@airflow_s3_key_sensor(
bucket_key='s3://my-bucket/data/*.csv',
wildcard_match=True,
timeout=3600,
poke_interval=60
)
class S3SensorFlow(FlowSpec):
@step
def start(self):
# Starts after S3 key is available
self.next(self.end)
@step
def end(self):
pass
Parameters:
bucket_key (required): S3 key(s) to wait for (supports wildcards)
bucket_name: S3 bucket name (optional if full s3:// URL provided)
wildcard_match: Interpret bucket_key as Unix wildcard pattern (default: False)
timeout: Timeout in seconds (default: 3600)
poke_interval: Check interval in seconds (default: 60)
mode: Sensor mode - ‘poke’ or ‘reschedule’ (default: ‘poke’)
aws_conn_id: Airflow AWS connection ID
External Task Sensor
Wait for another Airflow DAG to complete:
from metaflow import FlowSpec, step, airflow_external_task_sensor
from datetime import timedelta
@airflow_external_task_sensor(
external_dag_id='upstream_dag',
external_task_ids=['task1', 'task2'],
allowed_states=['success'],
execution_delta=timedelta(hours=1)
)
class ExternalTaskFlow(FlowSpec):
@step
def start(self):
# Starts after external tasks complete
self.next(self.end)
@step
def end(self):
pass
Parameters:
external_dag_id (required): DAG ID to wait for
external_task_ids: List of task IDs (waits for DAG if None)
allowed_states: List of acceptable states (default: [‘success’])
failed_states: List of failed states
execution_delta: Time difference with previous execution
check_existence: Verify external task/DAG exists (default: True)
timeout: Timeout in seconds (default: 3600)
poke_interval: Check interval in seconds (default: 60)
Multiple sensors can be added to a flow. The start step will execute only after all sensors complete.
Step Decorators
@kubernetes Decorator
All steps are executed using the Kubernetes decorator:
from metaflow import kubernetes
@kubernetes(
cpu=4,
memory=8192,
gpu=1,
gpu_vendor='nvidia',
image='my-registry/my-image:latest',
namespace='metaflow',
service_account='metaflow-sa',
secrets=['my-secret-1', 'my-secret-2'],
node_selector={'node.type': 'gpu'}
)
@step
def train_model(self):
# Training code
pass
@retry Decorator
Configure retry behavior:
from metaflow import retry
@retry(times=3, minutes_between_retries=5)
@step
def flaky_step(self):
pass
@timeout Decorator
Set execution timeouts:
from metaflow import timeout
@timeout(seconds=3600)
@step
def long_running_step(self):
pass
Parameters
All parameters must have default values for Airflow deployment:
from metaflow import FlowSpec, Parameter, step
class ParameterizedFlow(FlowSpec):
learning_rate = Parameter('learning_rate',
default=0.01,
type=float,
help='Learning rate')
epochs = Parameter('epochs',
default=10,
type=int,
help='Number of epochs')
@step
def start(self):
print(f"Training with lr={self.learning_rate}, epochs={self.epochs}")
self.next(self.end)
@step
def end(self):
pass
Parameters can be overridden in the Airflow UI when triggering a DAG run.
Foreach Loops
Metaflow foreach loops are supported via Airflow’s dynamic task mapping:
class ForeachFlow(FlowSpec):
@step
def start(self):
self.items = list(range(10))
self.next(self.process, foreach='items')
@kubernetes(cpu=2)
@step
def process(self):
self.result = self.input * 2
self.next(self.join)
@step
def join(self, inputs):
self.results = [inp.result for inp in inputs]
self.next(self.end)
@step
def end(self):
print(f"Results: {self.results}")
Foreach Limitations:
- Nested foreach loops are not supported
- Foreach must have an immediate join step (no linear steps between foreach and join)
- Parallel foreach (
@parallel decorator) is not supported
- Requires Airflow 2.3.0+ and Kubernetes provider 4.2.0+
Limitations
The following Metaflow features are not supported with Airflow:
Unsupported Decorators
@batch - AWS Batch execution
@slurm - Slurm cluster execution
@parallel - Parallel foreach execution
@trigger / @trigger_on_finish - Event triggering
@exit_hook - Exit hooks
Unsupported Kubernetes Options
use_tmpfs - tmpfs volume mounting
tmpfs_size - tmpfs size configuration
persistent_volume_claims - PVC mounting
image_pull_policy - Image pull policy
Datastore Requirements
Only the following datastores are supported:
- AWS S3 (
s3://)
- Azure Blob Storage (
azure://)
- Google Cloud Storage (
gs://)
Environment Configuration
Configure Metaflow for Airflow deployment:
Kubernetes Connection
# Use in-cluster configuration (when Airflow runs in Kubernetes)
export METAFLOW_AIRFLOW_KUBERNETES_CONN_ID=None
# Use specific kubeconfig context
export METAFLOW_AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT=my-context
# Use specific kubeconfig file
export METAFLOW_AIRFLOW_KUBERNETES_KUBECONFIG_FILE=/path/to/kubeconfig
# Set startup timeout for pods
export METAFLOW_AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS=600
Datastore Configuration
# S3 configuration
export METAFLOW_DATASTORE_SYSROOT_S3=s3://my-bucket/metaflow
export METAFLOW_DATATOOLS_S3ROOT=s3://my-bucket/datatools
# Azure configuration
export METAFLOW_DATASTORE_SYSROOT_AZURE=my-container/metaflow
export METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT=https://myaccount.blob.core.windows.net
# GCS configuration
export METAFLOW_DATASTORE_SYSROOT_GS=gs://my-bucket/metaflow
Kubernetes Secrets
# Global secrets available to all steps
export METAFLOW_KUBERNETES_SECRETS=secret1,secret2
# Service account for pod execution
export METAFLOW_KUBERNETES_SERVICE_ACCOUNT=metaflow-sa
Monitoring and Debugging
Viewing Logs
View task logs in the Airflow UI:
- Navigate to your DAG
- Click on a task instance
- View logs in the Logs tab
Accessing Results
Access run results using the Metaflow Client API:
from metaflow import Flow, namespace
# Set namespace to production token
namespace('production:my-token')
# Access the flow
flow = Flow('MyAirflowFlow')
# Get latest run
run = flow.latest_run
# Access artifacts
for step in run:
print(f"{step.id}: {step.finished_at}")
if hasattr(step, 'result'):
print(f" Result: {step.task.data.result}")
Common Issues
DAG not appearing in Airflow UI:
- Check that the DAG file is in the correct directory
- Verify no Python syntax errors
- Check Airflow scheduler logs
Tasks failing to start:
- Verify Kubernetes connection configuration
- Check service account permissions
- Ensure Docker image is accessible
Parameter issues:
- Ensure all parameters have default values
- Check parameter types are supported (int, str, float, bool)
Best Practices
-
Use Production Tokens: Always use production tokens to manage deployments and organize results by deployment version.
-
Test Locally First: Run your flow locally before deploying to Airflow:
-
Resource Management: Set appropriate CPU and memory limits for each step to optimize cluster utilization.
-
Idempotency: Ensure your steps are idempotent for reliable retry behavior.
-
Monitoring: Use Airflow’s built-in monitoring and alerting features to track flow execution.
-
Version Control: Keep DAG files in version control and use tags to track deployments.
Example: Complete Production Flow
from metaflow import FlowSpec, step, Parameter, schedule
from metaflow import kubernetes, retry, timeout, airflow_s3_key_sensor
from datetime import timedelta
@airflow_s3_key_sensor(
bucket_key='s3://data-bucket/input/*.parquet',
wildcard_match=True,
timeout=7200
)
@schedule(cron='0 2 * * *') # Daily at 2 AM
class ProductionMLFlow(FlowSpec):
model_type = Parameter('model_type',
default='xgboost',
help='Model type to train')
@step
def start(self):
"""Initialize workflow"""
self.run_date = datetime.now().strftime('%Y-%m-%d')
self.next(self.load_data)
@kubernetes(cpu=4, memory=16384)
@retry(times=3, minutes_between_retries=5)
@timeout(seconds=3600)
@step
def load_data(self):
"""Load and preprocess data"""
# Load data from S3
self.data = load_data(self.run_date)
self.splits = list(range(10))
self.next(self.train_model, foreach='splits')
@kubernetes(cpu=8, memory=32768, gpu=1, gpu_vendor='nvidia')
@retry(times=2)
@timeout(seconds=7200)
@step
def train_model(self):
"""Train model on data split"""
split_id = self.input
self.model = train(self.data, split_id, self.model_type)
self.metrics = evaluate(self.model)
self.next(self.join_models)
@step
def join_models(self, inputs):
"""Select best model"""
self.all_metrics = [inp.metrics for inp in inputs]
best_idx = max(range(len(inputs)),
key=lambda i: inputs[i].metrics['accuracy'])
self.best_model = inputs[best_idx].model
self.next(self.deploy)
@kubernetes(cpu=2, memory=8192)
@step
def deploy(self):
"""Deploy best model"""
deploy_model(self.best_model, self.run_date)
self.next(self.end)
@step
def end(self):
"""Workflow complete"""
print(f"Deployed model for {self.run_date}")
if __name__ == '__main__':
ProductionMLFlow()
Deploy this flow:
python production_ml_flow.py airflow create production_ml_dag.py \
--max-workers 50 \
--workflow-timeout 14400 \
--tag production \
--tag ml-pipeline
Additional Resources