Skip to main content
Kubernetes is a container orchestration platform that Metaflow can use to execute workflow steps. The @kubernetes decorator runs steps on Kubernetes clusters with full control over resources, scheduling, and infrastructure.

Overview

The @kubernetes decorator executes a step on a Kubernetes cluster:
from metaflow import FlowSpec, step, kubernetes, resources

class K8sFlow(FlowSpec):
    @kubernetes
    @resources(cpu=4, memory=16000)
    @step
    def train(self):
        # This step runs on Kubernetes
        model = train_model()
        self.model_path = save_model(model)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Model saved to {self.model_path}")

Setup

1

Install Kubernetes client

Install the Python Kubernetes library:
pip install kubernetes
2

Configure kubectl access

Ensure kubectl is configured to access your cluster:
kubectl cluster-info
kubectl get nodes
Set up kubeconfig if needed:
export KUBECONFIG=~/.kube/config
3

Configure Metaflow for Kubernetes

Set required environment variables:
export METAFLOW_KUBERNETES_NAMESPACE=metaflow
export METAFLOW_KUBERNETES_SERVICE_ACCOUNT=metaflow-user
export METAFLOW_DATASTORE_SYSROOT_S3=s3://your-bucket/metaflow
Or use Azure/GCS:
export METAFLOW_DATASTORE_SYSROOT_AZURE=wasbs://container@account.blob.core.windows.net/metaflow
export METAFLOW_DATASTORE_SYSROOT_GS=gs://your-bucket/metaflow
4

Test your setup

Run a simple flow:
python -c "from metaflow import FlowSpec, step, kubernetes
class TestFlow(FlowSpec):
    @kubernetes
    @step
    def start(self):
        print('Running on Kubernetes!')
        self.next(self.end)
    @step
    def end(self):
        pass

TestFlow()" run --with kubernetes --datastore=s3

Basic Usage

Simple Kubernetes Step

from metaflow import FlowSpec, step, kubernetes

class SimpleFlow(FlowSpec):
    @kubernetes
    @step
    def start(self):
        print("Running on Kubernetes")
        self.next(self.end)
    
    @step
    def end(self):
        print("Back to local execution")

Specify Resources

@kubernetes(cpu=8, memory=32000, gpu=1)
@step
def train(self):
    # Use 8 CPUs, 32GB RAM, and 1 GPU
    model = train_gpu_model()

Use Custom Docker Image

@kubernetes(image='myregistry.io/ml-image:v1.0')
@step
def process(self):
    # Runs in your custom Docker container
    import custom_library
    result = custom_library.process()

Decorator Parameters

The @kubernetes decorator provides extensive configuration options:

Resource Allocation

@kubernetes(
    cpu=4,              # Number of CPUs (default: 1)
    memory=16000,       # Memory in MB (default: 4096)
    disk=50000,         # Disk size in MB (default: 10240)
    gpu=1,              # Number of GPUs (default: None)
)
@step
def compute(self):
    pass

Container Configuration

@kubernetes(
    image='python:3.9',                     # Docker image
    namespace='production',                  # Kubernetes namespace
    service_account='ml-service-account',    # Service account
    image_pull_policy='Always',              # Pull policy
    image_pull_secrets=['regcred'],          # Image pull secrets
)
@step
def train(self):
    pass

Scheduling and Placement

@kubernetes(
    node_selector={'node.type': 'gpu', 'zone': 'us-west-2a'},  # Node selection
    tolerations=[{                           # Tolerations
        'key': 'nvidia.com/gpu',
        'operator': 'Exists',
        'effect': 'NoSchedule'
    }],
)
@step
def gpu_step(self):
    pass

Storage Configuration

@kubernetes(
    persistent_volume_claims={'data-pvc': '/mnt/data'},  # Mount PVCs
    use_tmpfs=True,                          # Enable tmpfs
    tmpfs_size=8192,                         # tmpfs size in MiB
    tmpfs_path='/tmp',                       # tmpfs mount path
    shared_memory=1024,                      # Shared memory in MiB
)
@step
def process(self):
    pass

Advanced Options

@kubernetes(
    labels={'team': 'ml', 'project': 'recommendations'},  # Pod labels
    annotations={'prometheus.io/scrape': 'true'},          # Pod annotations
    port=8080,                               # Container port
    qos='Guaranteed',                        # QoS class
    security_context={                       # Security settings
        'run_as_user': 1000,
        'run_as_non_root': True,
    },
)
@step
def secure_step(self):
    pass

Full Reference

See kubernetes_decorator.py:54-139 for complete parameter documentation.

Resource Management

Combining with @resources

Use @resources for portability across platforms:
from metaflow import FlowSpec, step, kubernetes, resources

class PortableFlow(FlowSpec):
    @kubernetes
    @resources(cpu=4, memory=16000, gpu=1)
    @step
    def train(self):
        # Resources specified by @resources
        pass
Switch platforms at runtime:
# Run on Kubernetes
python myflow.py run --with kubernetes

# Run on AWS Batch (same resource spec)
python myflow.py run --with batch

GPU Support

@kubernetes(
    gpu=2,
    gpu_vendor='nvidia',  # or 'amd'
    memory=64000,
    node_selector={'accelerator': 'nvidia-tesla-v100'}
)
@step
def train_gpu(self):
    import torch
    device = torch.device('cuda')
    model = MyModel().to(device)
    train(model)
Ensure your Kubernetes cluster has GPU nodes with appropriate drivers and device plugins installed.

Disk Management

Kubernetes allows explicit disk size control:
@kubernetes(
    disk=100000,  # 100GB disk space
    use_tmpfs=True,
    tmpfs_size=10240  # 10GB tmpfs for fast I/O
)
@step
def process_data(self):
    # Store intermediate data on disk
    df = load_huge_dataset()
    df.to_parquet('/tmp/intermediate.parquet')

Multi-Node Execution

Kubernetes supports distributed multi-node jobs:
from metaflow import FlowSpec, step, kubernetes, parallel, current

class DistributedFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.train, num_parallel=4)
    
    @parallel
    @kubernetes(cpu=8, memory=32000, gpu=1)
    @step
    def train(self):
        print(f"Node {current.parallel.node_index} of {current.parallel.num_nodes}")
        print(f"Main IP: {current.parallel.main_ip}")
        print(f"Control task ID: {current.parallel.control_task_id}")
        
        # Distributed training with PyTorch
        import torch.distributed as dist
        dist.init_process_group(
            backend='nccl',
            init_method=f'tcp://{current.parallel.main_ip}:29500',
            rank=current.parallel.node_index,
            world_size=current.parallel.num_nodes
        )
        
        train_distributed()
        self.next(self.join)
    
    @step
    def join(self, inputs):
        print(f"Completed {len(inputs)} parallel tasks")
        self.next(self.end)
    
    @step
    def end(self):
        pass
See Distributed Computing for details.

Node Selection and Scheduling

Node Selectors

Target specific nodes:
@kubernetes(
    node_selector={
        'node.kubernetes.io/instance-type': 'n1-standard-8',
        'topology.kubernetes.io/zone': 'us-central1-a',
    }
)
@step
def compute(self):
    pass
Or use string format:
@kubernetes(node_selector='instance-type=n1-standard-8,zone=us-central1-a')
@step
def compute(self):
    pass

Tolerations

Schedule on tainted nodes:
@kubernetes(
    tolerations=[
        {
            'key': 'nvidia.com/gpu',
            'operator': 'Exists',
            'effect': 'NoSchedule'
        },
        {
            'key': 'workload',
            'operator': 'Equal',
            'value': 'ml',
            'effect': 'NoSchedule'
        }
    ]
)
@step
def gpu_workload(self):
    pass

Quality of Service

Control resource guarantees:
@kubernetes(
    qos='Guaranteed',  # or 'Burstable'
    cpu=4,
    memory=16000
)
@step
def critical_step(self):
    # Guaranteed: Gets exact resources, highest priority
    pass

@kubernetes(
    qos='Burstable',
    cpu=2,
    memory=8000
)
@step
def flexible_step(self):
    # Burstable: Can use more if available, lower priority
    pass

Storage Options

Persistent Volume Claims

Mount existing PVCs:
@kubernetes(
    persistent_volume_claims={
        'data-pvc': '/mnt/data',
        'models-pvc': '/mnt/models',
        'cache-pvc': '/mnt/cache'
    }
)
@step
def process(self):
    # Access mounted volumes
    data = load_from_disk('/mnt/data/input.csv')
    model = train(data)
    model.save('/mnt/models/model.pkl')

Tmpfs for Fast I/O

@kubernetes(
    use_tmpfs=True,
    tmpfs_size=10240,      # 10GB
    tmpfs_path='/fast-tmp',
    tmpfs_tempdir=True     # Set METAFLOW_TEMPDIR to tmpfs
)
@step
def fast_io(self):
    # Use tmpfs for intermediate data
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w') as f:
        # File automatically created in tmpfs
        f.write(large_data)

Shared Memory

@kubernetes(shared_memory=8192)  # 8GB /dev/shm
@step
def parallel_processing(self):
    # Use shared memory for inter-process communication
    from multiprocessing import shared_memory
    shm = shared_memory.SharedMemory(create=True, size=1000000)

Environment Configuration

Custom Docker Images

Build and use custom images:
# Dockerfile
FROM python:3.9
RUN pip install pandas scikit-learn tensorflow
COPY requirements.txt .
RUN pip install -r requirements.txt
# Build and push
docker build -t myregistry.io/ml-image:v1.0 .
docker push myregistry.io/ml-image:v1.0
@kubernetes(
    image='myregistry.io/ml-image:v1.0',
    image_pull_policy='Always',
    image_pull_secrets=['regcred']  # For private registries
)
@step
def train(self):
    import tensorflow as tf
    # Use packages from custom image

Image Pull Secrets

Access private registries:
# Create secret
kubectl create secret docker-registry regcred \
  --docker-server=myregistry.io \
  --docker-username=user \
  --docker-password=pass \
  [email protected]
@kubernetes(
    image='myregistry.io/private-image:latest',
    image_pull_secrets=['regcred']
)
@step
def secure_step(self):
    pass

Service Accounts

@kubernetes(service_account='ml-service-account')
@step
def access_cluster_resources(self):
    # Pod runs with specified service account permissions
    from kubernetes import client, config
    config.load_incluster_config()
    v1 = client.CoreV1Api()
    pods = v1.list_pod_for_all_namespaces()

Monitoring and Debugging

View Logs

Logs stream automatically:
python myflow.py run --with kubernetes
# [train/1] Task is starting.
# [train/1] Running on Kubernetes pod: myflow-train-abc123
# [train/1] Training model...
# [train/1] Task finished successfully.
View logs later:
python myflow.py logs 123/train/456

Check Pod Status

Use kubectl:
# List Metaflow pods
kubectl get pods -n metaflow -l metaflow/flow_name=MyFlow

# Describe specific pod
kubectl describe pod myflow-train-abc123 -n metaflow

# View logs
kubectl logs myflow-train-abc123 -n metaflow

Access Metadata

from metaflow import Flow

run = Flow('MyFlow').latest_run
for task in run['train']:
    print(f"Pod: {task.metadata_dict.get('kubernetes-pod-name')}")
    print(f"Node: {task.metadata_dict.get('kubernetes-node-ip')}")
    print(f"Namespace: {task.metadata_dict.get('kubernetes-pod-namespace')}")

Error Handling

Automatic Retries

from metaflow import retry

@retry(times=3)
@kubernetes
@step
def flaky_step(self):
    # Retried up to 3 times on failure
    result = potentially_failing_operation()

Timeout Protection

from metaflow import timeout

@timeout(hours=2)
@kubernetes
@step
def long_running(self):
    # Automatically killed after 2 hours
    expensive_computation()

Graceful Shutdown

Handle SIGTERM for graceful termination:
import signal
import sys

@kubernetes
@step
def graceful_step(self):
    def signal_handler(sig, frame):
        print('Received SIGTERM, saving state...')
        save_checkpoint()
        sys.exit(0)
    
    signal.signal(signal.SIGTERM, signal_handler)
    
    # Long-running work
    train_with_checkpoints()

Best Practices

Use @resources for portability

Specify requirements with @resources to easily switch between Kubernetes and AWS Batch

Label your pods

Add labels for monitoring, cost allocation, and resource management

Set resource limits

Always specify CPU and memory to ensure predictable scheduling

Use PVCs for large data

Mount persistent volumes for datasets too large for container storage

Troubleshooting

Cause: Scheduler cannot find suitable nodeSolutions:
  • Check node resources: kubectl describe nodes
  • Verify node selectors match available nodes
  • Check tolerations for tainted nodes
  • Review resource requests (CPU/memory/GPU)
  • Inspect pod events: kubectl describe pod <pod-name>
Cause: Cannot pull Docker imageSolutions:
  • Verify image exists: docker pull <image>
  • Check image pull secrets are configured
  • Ensure service account has registry access
  • Validate image name and tag
  • Check registry is accessible from cluster
Cause: Pod exceeded memory limitSolutions:
  • Increase memory parameter
  • Process data in smaller chunks
  • Use memory-efficient algorithms
  • Enable swap or tmpfs for temporary data
  • Monitor actual usage: kubectl top pod
Cause: PVC or storage configuration issuesSolutions:
  • Verify PVC exists: kubectl get pvc
  • Check PVC is bound: kubectl describe pvc <name>
  • Ensure storage class is configured
  • Verify mount paths are correct
  • Check service account permissions

Kubernetes vs AWS Batch

FeatureKubernetesAWS Batch
SetupRequires clusterManaged service
FlexibilityHigh controlLimited control
Disk managementExplicit sizesLimited
Multi-cloudYesAWS only
ComplexityHigherLower
CostPay for nodesPay per job
ScalingManual/HPAAutomatic
StoragePVCs, multiple typesEBS, EFS

Next Steps

Distributed Computing

Scale to multi-node workloads on Kubernetes

Resources Management

Master the @resources decorator

AWS Batch

Compare with AWS Batch execution

Remote Execution

Understand remote execution concepts

Build docs developers (and LLMs) love