Kubernetes is a container orchestration platform that Metaflow can use to execute workflow steps. The @kubernetes decorator runs steps on Kubernetes clusters with full control over resources, scheduling, and infrastructure.
Overview
The @kubernetes decorator executes a step on a Kubernetes cluster:
from metaflow import FlowSpec, step, kubernetes, resources
class K8sFlow ( FlowSpec ):
@kubernetes
@resources ( cpu = 4 , memory = 16000 )
@step
def train ( self ):
# This step runs on Kubernetes
model = train_model()
self .model_path = save_model(model)
self .next( self .end)
@step
def end ( self ):
print ( f "Model saved to { self .model_path } " )
Setup
Install Kubernetes client
Install the Python Kubernetes library:
Configure kubectl access
Ensure kubectl is configured to access your cluster: kubectl cluster-info
kubectl get nodes
Set up kubeconfig if needed: export KUBECONFIG =~ /. kube / config
Configure Metaflow for Kubernetes
Set required environment variables: export METAFLOW_KUBERNETES_NAMESPACE = metaflow
export METAFLOW_KUBERNETES_SERVICE_ACCOUNT = metaflow-user
export METAFLOW_DATASTORE_SYSROOT_S3 = s3 :// your-bucket / metaflow
Or use Azure/GCS: export METAFLOW_DATASTORE_SYSROOT_AZURE = wasbs :// container @ account . blob . core . windows . net / metaflow
export METAFLOW_DATASTORE_SYSROOT_GS = gs :// your-bucket / metaflow
Test your setup
Run a simple flow: python -c "from metaflow import FlowSpec, step, kubernetes
class TestFlow(FlowSpec):
@kubernetes
@step
def start(self):
print('Running on Kubernetes!')
self.next(self.end)
@step
def end(self):
pass
TestFlow()" run --with kubernetes --datastore=s3
Basic Usage
Simple Kubernetes Step
from metaflow import FlowSpec, step, kubernetes
class SimpleFlow ( FlowSpec ):
@kubernetes
@step
def start ( self ):
print ( "Running on Kubernetes" )
self .next( self .end)
@step
def end ( self ):
print ( "Back to local execution" )
Specify Resources
@kubernetes ( cpu = 8 , memory = 32000 , gpu = 1 )
@step
def train ( self ):
# Use 8 CPUs, 32GB RAM, and 1 GPU
model = train_gpu_model()
Use Custom Docker Image
@kubernetes ( image = 'myregistry.io/ml-image:v1.0' )
@step
def process ( self ):
# Runs in your custom Docker container
import custom_library
result = custom_library.process()
Decorator Parameters
The @kubernetes decorator provides extensive configuration options:
Resource Allocation
@kubernetes (
cpu = 4 , # Number of CPUs (default: 1)
memory = 16000 , # Memory in MB (default: 4096)
disk = 50000 , # Disk size in MB (default: 10240)
gpu = 1 , # Number of GPUs (default: None)
)
@step
def compute ( self ):
pass
Container Configuration
@kubernetes (
image = 'python:3.9' , # Docker image
namespace = 'production' , # Kubernetes namespace
service_account = 'ml-service-account' , # Service account
image_pull_policy = 'Always' , # Pull policy
image_pull_secrets = [ 'regcred' ], # Image pull secrets
)
@step
def train ( self ):
pass
Scheduling and Placement
@kubernetes (
node_selector = { 'node.type' : 'gpu' , 'zone' : 'us-west-2a' }, # Node selection
tolerations = [{ # Tolerations
'key' : 'nvidia.com/gpu' ,
'operator' : 'Exists' ,
'effect' : 'NoSchedule'
}],
)
@step
def gpu_step ( self ):
pass
Storage Configuration
@kubernetes (
persistent_volume_claims = { 'data-pvc' : '/mnt/data' }, # Mount PVCs
use_tmpfs = True , # Enable tmpfs
tmpfs_size = 8192 , # tmpfs size in MiB
tmpfs_path = '/tmp' , # tmpfs mount path
shared_memory = 1024 , # Shared memory in MiB
)
@step
def process ( self ):
pass
Advanced Options
@kubernetes (
labels = { 'team' : 'ml' , 'project' : 'recommendations' }, # Pod labels
annotations = { 'prometheus.io/scrape' : 'true' }, # Pod annotations
port = 8080 , # Container port
qos = 'Guaranteed' , # QoS class
security_context = { # Security settings
'run_as_user' : 1000 ,
'run_as_non_root' : True ,
},
)
@step
def secure_step ( self ):
pass
Full Reference
See kubernetes_decorator.py:54-139 for complete parameter documentation.
Resource Management
Combining with @resources
Use @resources for portability across platforms:
from metaflow import FlowSpec, step, kubernetes, resources
class PortableFlow ( FlowSpec ):
@kubernetes
@resources ( cpu = 4 , memory = 16000 , gpu = 1 )
@step
def train ( self ):
# Resources specified by @resources
pass
Switch platforms at runtime:
# Run on Kubernetes
python myflow.py run --with kubernetes
# Run on AWS Batch (same resource spec)
python myflow.py run --with batch
GPU Support
@kubernetes (
gpu = 2 ,
gpu_vendor = 'nvidia' , # or 'amd'
memory = 64000 ,
node_selector = { 'accelerator' : 'nvidia-tesla-v100' }
)
@step
def train_gpu ( self ):
import torch
device = torch.device( 'cuda' )
model = MyModel().to(device)
train(model)
Ensure your Kubernetes cluster has GPU nodes with appropriate drivers and device plugins installed.
Disk Management
Kubernetes allows explicit disk size control:
@kubernetes (
disk = 100000 , # 100GB disk space
use_tmpfs = True ,
tmpfs_size = 10240 # 10GB tmpfs for fast I/O
)
@step
def process_data ( self ):
# Store intermediate data on disk
df = load_huge_dataset()
df.to_parquet( '/tmp/intermediate.parquet' )
Multi-Node Execution
Kubernetes supports distributed multi-node jobs:
from metaflow import FlowSpec, step, kubernetes, parallel, current
class DistributedFlow ( FlowSpec ):
@step
def start ( self ):
self .next( self .train, num_parallel = 4 )
@parallel
@kubernetes ( cpu = 8 , memory = 32000 , gpu = 1 )
@step
def train ( self ):
print ( f "Node { current.parallel.node_index } of { current.parallel.num_nodes } " )
print ( f "Main IP: { current.parallel.main_ip } " )
print ( f "Control task ID: { current.parallel.control_task_id } " )
# Distributed training with PyTorch
import torch.distributed as dist
dist.init_process_group(
backend = 'nccl' ,
init_method = f 'tcp:// { current.parallel.main_ip } :29500' ,
rank = current.parallel.node_index,
world_size = current.parallel.num_nodes
)
train_distributed()
self .next( self .join)
@step
def join ( self , inputs ):
print ( f "Completed { len (inputs) } parallel tasks" )
self .next( self .end)
@step
def end ( self ):
pass
See Distributed Computing for details.
Node Selection and Scheduling
Node Selectors
Target specific nodes:
@kubernetes (
node_selector = {
'node.kubernetes.io/instance-type' : 'n1-standard-8' ,
'topology.kubernetes.io/zone' : 'us-central1-a' ,
}
)
@step
def compute ( self ):
pass
Or use string format:
@kubernetes ( node_selector = 'instance-type=n1-standard-8,zone=us-central1-a' )
@step
def compute ( self ):
pass
Tolerations
Schedule on tainted nodes:
@kubernetes (
tolerations = [
{
'key' : 'nvidia.com/gpu' ,
'operator' : 'Exists' ,
'effect' : 'NoSchedule'
},
{
'key' : 'workload' ,
'operator' : 'Equal' ,
'value' : 'ml' ,
'effect' : 'NoSchedule'
}
]
)
@step
def gpu_workload ( self ):
pass
Quality of Service
Control resource guarantees:
@kubernetes (
qos = 'Guaranteed' , # or 'Burstable'
cpu = 4 ,
memory = 16000
)
@step
def critical_step ( self ):
# Guaranteed: Gets exact resources, highest priority
pass
@kubernetes (
qos = 'Burstable' ,
cpu = 2 ,
memory = 8000
)
@step
def flexible_step ( self ):
# Burstable: Can use more if available, lower priority
pass
Storage Options
Persistent Volume Claims
Mount existing PVCs:
@kubernetes (
persistent_volume_claims = {
'data-pvc' : '/mnt/data' ,
'models-pvc' : '/mnt/models' ,
'cache-pvc' : '/mnt/cache'
}
)
@step
def process ( self ):
# Access mounted volumes
data = load_from_disk( '/mnt/data/input.csv' )
model = train(data)
model.save( '/mnt/models/model.pkl' )
Tmpfs for Fast I/O
@kubernetes (
use_tmpfs = True ,
tmpfs_size = 10240 , # 10GB
tmpfs_path = '/fast-tmp' ,
tmpfs_tempdir = True # Set METAFLOW_TEMPDIR to tmpfs
)
@step
def fast_io ( self ):
# Use tmpfs for intermediate data
import tempfile
with tempfile.NamedTemporaryFile( mode = 'w' ) as f:
# File automatically created in tmpfs
f.write(large_data)
Shared Memory
@kubernetes ( shared_memory = 8192 ) # 8GB /dev/shm
@step
def parallel_processing ( self ):
# Use shared memory for inter-process communication
from multiprocessing import shared_memory
shm = shared_memory.SharedMemory( create = True , size = 1000000 )
Environment Configuration
Custom Docker Images
Build and use custom images:
# Dockerfile
FROM python:3.9
RUN pip install pandas scikit-learn tensorflow
COPY requirements.txt .
RUN pip install -r requirements.txt
# Build and push
docker build -t myregistry.io/ml-image:v1.0 .
docker push myregistry.io/ml-image:v1.0
@kubernetes (
image = 'myregistry.io/ml-image:v1.0' ,
image_pull_policy = 'Always' ,
image_pull_secrets = [ 'regcred' ] # For private registries
)
@step
def train ( self ):
import tensorflow as tf
# Use packages from custom image
Image Pull Secrets
Access private registries:
# Create secret
kubectl create secret docker-registry regcred \
--docker-server=myregistry.io \
--docker-username=user \
--docker-password=pass \
[email protected]
@kubernetes (
image = 'myregistry.io/private-image:latest' ,
image_pull_secrets = [ 'regcred' ]
)
@step
def secure_step ( self ):
pass
Service Accounts
@kubernetes ( service_account = 'ml-service-account' )
@step
def access_cluster_resources ( self ):
# Pod runs with specified service account permissions
from kubernetes import client, config
config.load_incluster_config()
v1 = client.CoreV1Api()
pods = v1.list_pod_for_all_namespaces()
Monitoring and Debugging
View Logs
Logs stream automatically:
python myflow.py run --with kubernetes
# [train/1] Task is starting.
# [train/1] Running on Kubernetes pod: myflow-train-abc123
# [train/1] Training model...
# [train/1] Task finished successfully.
View logs later:
python myflow.py logs 123/train/456
Check Pod Status
Use kubectl:
# List Metaflow pods
kubectl get pods -n metaflow -l metaflow/flow_name=MyFlow
# Describe specific pod
kubectl describe pod myflow-train-abc123 -n metaflow
# View logs
kubectl logs myflow-train-abc123 -n metaflow
from metaflow import Flow
run = Flow( 'MyFlow' ).latest_run
for task in run[ 'train' ]:
print ( f "Pod: { task.metadata_dict.get( 'kubernetes-pod-name' ) } " )
print ( f "Node: { task.metadata_dict.get( 'kubernetes-node-ip' ) } " )
print ( f "Namespace: { task.metadata_dict.get( 'kubernetes-pod-namespace' ) } " )
Error Handling
Automatic Retries
from metaflow import retry
@retry ( times = 3 )
@kubernetes
@step
def flaky_step ( self ):
# Retried up to 3 times on failure
result = potentially_failing_operation()
Timeout Protection
from metaflow import timeout
@timeout ( hours = 2 )
@kubernetes
@step
def long_running ( self ):
# Automatically killed after 2 hours
expensive_computation()
Graceful Shutdown
Handle SIGTERM for graceful termination:
import signal
import sys
@kubernetes
@step
def graceful_step ( self ):
def signal_handler ( sig , frame ):
print ( 'Received SIGTERM, saving state...' )
save_checkpoint()
sys.exit( 0 )
signal.signal(signal. SIGTERM , signal_handler)
# Long-running work
train_with_checkpoints()
Best Practices
Use @resources for portability Specify requirements with @resources to easily switch between Kubernetes and AWS Batch
Label your pods Add labels for monitoring, cost allocation, and resource management
Set resource limits Always specify CPU and memory to ensure predictable scheduling
Use PVCs for large data Mount persistent volumes for datasets too large for container storage
Troubleshooting
Pod stuck in Pending state
Cause : Scheduler cannot find suitable nodeSolutions :
Check node resources: kubectl describe nodes
Verify node selectors match available nodes
Check tolerations for tainted nodes
Review resource requests (CPU/memory/GPU)
Inspect pod events: kubectl describe pod <pod-name>
Cause : Cannot pull Docker imageSolutions :
Verify image exists: docker pull <image>
Check image pull secrets are configured
Ensure service account has registry access
Validate image name and tag
Check registry is accessible from cluster
Cause : Pod exceeded memory limitSolutions :
Increase memory parameter
Process data in smaller chunks
Use memory-efficient algorithms
Enable swap or tmpfs for temporary data
Monitor actual usage: kubectl top pod
Cause : PVC or storage configuration issuesSolutions :
Verify PVC exists: kubectl get pvc
Check PVC is bound: kubectl describe pvc <name>
Ensure storage class is configured
Verify mount paths are correct
Check service account permissions
Kubernetes vs AWS Batch
Feature Kubernetes AWS Batch Setup Requires cluster Managed service Flexibility High control Limited control Disk management Explicit sizes Limited Multi-cloud Yes AWS only Complexity Higher Lower Cost Pay for nodes Pay per job Scaling Manual/HPA Automatic Storage PVCs, multiple types EBS, EFS
Next Steps
Distributed Computing Scale to multi-node workloads on Kubernetes
Resources Management Master the @resources decorator
AWS Batch Compare with AWS Batch execution
Remote Execution Understand remote execution concepts