Skip to main content
The @kubernetes decorator specifies that a step should execute on Kubernetes.

Basic Usage

from metaflow import FlowSpec, step, kubernetes

class MyFlow(FlowSpec):
    @kubernetes(cpu=4, memory=16384)
    @step
    def process(self):
        # This step runs on Kubernetes
        pass

if __name__ == '__main__':
    MyFlow()

Description

The @kubernetes decorator enables steps to run on Kubernetes clusters, providing container orchestration and resource management. Kubernetes offers flexible deployment options and supports various cloud providers.

Prerequisites

  • Kubernetes cluster configured
  • Cloud datastore configured (--datastore=s3, --datastore=azure, or --datastore=gs)
  • Kubernetes Python package installed: pip install kubernetes

Parameters

cpu
int
default:"1"
Number of CPUs required for this step. If @resources is also present, the maximum value from all decorators is used.
memory
int
default:"4096"
Memory size (in MB) required for this step. If @resources is also present, the maximum value from all decorators is used.
disk
int
default:"10240"
Disk size (in MB) required for this step. If @resources is also present, the maximum value from all decorators is used.
gpu
int
default:"None"
Number of GPUs required for this step. A value of 0 implies that the scheduled node should not have GPUs.
image
str
default:"None"
Docker image to use when launching on Kubernetes. If not specified and METAFLOW_KUBERNETES_CONTAINER_IMAGE is set, that image is used. Otherwise, defaults to a Python image matching your Python version.
namespace
str
default:"METAFLOW_KUBERNETES_NAMESPACE"
Kubernetes namespace to use when launching pod.
service_account
str
default:"METAFLOW_KUBERNETES_SERVICE_ACCOUNT"
Kubernetes service account to use when launching pod.
secrets
List[str]
default:"None"
Kubernetes secrets to use when launching pod. These are in addition to secrets defined in METAFLOW_KUBERNETES_SECRETS.
node_selector
Union[Dict[str,str], str]
default:"None"
Kubernetes node selector(s) to apply to the pod. Can be passed as a comma-separated string like 'kubernetes.io/os=linux,kubernetes.io/arch=amd64' or as a dictionary {'kubernetes.io/os': 'linux', 'kubernetes.io/arch': 'amd64'}.
tolerations
List[Dict[str,str]]
default:"[]"
Kubernetes tolerations to use when launching pod. Default is extracted from METAFLOW_KUBERNETES_TOLERATIONS.
labels
Dict[str, str]
default:"METAFLOW_KUBERNETES_LABELS"
Kubernetes labels to apply to the pod.
annotations
Dict[str, str]
default:"METAFLOW_KUBERNETES_ANNOTATIONS"
Kubernetes annotations to apply to the pod.
gpu_vendor
str
default:"KUBERNETES_GPU_VENDOR"
The vendor of the GPUs to be used for this step (e.g., ‘nvidia’, ‘amd’).
image_pull_policy
str
default:"KUBERNETES_IMAGE_PULL_POLICY"
The imagePullPolicy to apply to the Docker image of the step.
image_pull_secrets
List[str]
default:"[]"
Kubernetes image pull secrets to use when pulling container images. Default is extracted from METAFLOW_KUBERNETES_IMAGE_PULL_SECRETS.
persistent_volume_claims
Dict[str, str]
default:"None"
A map of persistent volumes to mount to the pod. The map is from persistent volume claim names to mount paths, e.g., {'pvc-name': '/path/to/mount/on'}.
shared_memory
int
default:"None"
Shared memory size (in MiB) required for this step.
use_tmpfs
bool
default:"False"
Enable an explicit tmpfs mount for this step.
tmpfs_tempdir
bool
default:"True"
Sets METAFLOW_TEMPDIR to tmpfs_path if enabled.
tmpfs_size
int
default:"None"
The size (in MiB) of the tmpfs mount for this step. Defaults to 50% of allocated memory.
tmpfs_path
str
default:"/metaflow_temp"
Path to tmpfs mount for this step.
port
int
default:"None"
Port number to specify in the Kubernetes job object.
compute_pool
str
default:"None"
Compute pool to use for this step. If not specified, any accessible compute pool within the perimeter is used.
qos
str
default:"Burstable"
Quality of Service class to assign to the pod. Supported values: Guaranteed, Burstable, BestEffort.
security_context
Dict[str, Any]
default:"None"
Container security context. Applies to the task container. Supported keys:
  • privileged (bool)
  • allow_privilege_escalation (bool)
  • run_as_user (int)
  • run_as_group (int)
  • run_as_non_root (bool)
hostname_resolution_timeout
int
default:"600"
Timeout in seconds for worker tasks to resolve the hostname of control task. Only applicable when @parallel is used.

Examples

Basic Kubernetes Execution

@kubernetes(cpu=4, memory=16384)
@step
def process_data(self):
    # Process data on Kubernetes
    pass

GPU-Accelerated Step

@kubernetes(
    cpu=8,
    memory=32768,
    gpu=2,
    gpu_vendor='nvidia'
)
@step
def train_model(self):
    import torch
    # Train model using 2 NVIDIA GPUs
    pass

Node Selection

@kubernetes(
    cpu=4,
    memory=16384,
    node_selector={
        'kubernetes.io/arch': 'amd64',
        'node-type': 'compute-optimized'
    }
)
@step
def compute_intensive(self):
    pass

With Tolerations

@kubernetes(
    cpu=4,
    memory=16384,
    tolerations=[
        {
            'key': 'dedicated',
            'operator': 'Equal',
            'value': 'ml-workloads',
            'effect': 'NoSchedule'
        }
    ]
)
@step
def on_dedicated_nodes(self):
    pass

Custom Docker Image

@kubernetes(
    image='my-registry.com/ml-image:v1.2.3',
    cpu=4,
    memory=16384,
    image_pull_policy='Always'
)
@step
def custom_environment(self):
    # Runs with custom dependencies
    pass

With Persistent Volumes

@kubernetes(
    cpu=4,
    memory=16384,
    persistent_volume_claims={
        'data-pvc': '/mnt/data',
        'models-pvc': '/mnt/models'
    }
)
@step
def access_storage(self):
    # Access persistent storage
    with open('/mnt/data/input.csv') as f:
        data = f.read()
    pass

With Labels and Annotations

@kubernetes(
    cpu=4,
    memory=8192,
    labels={
        'team': 'data-science',
        'project': 'recommendation'
    },
    annotations={
        'prometheus.io/scrape': 'true',
        'prometheus.io/port': '9090'
    }
)
@step
def monitored_task(self):
    pass

Security Context

@kubernetes(
    cpu=2,
    memory=8192,
    security_context={
        'run_as_user': 1000,
        'run_as_group': 1000,
        'run_as_non_root': True
    }
)
@step
def secure_task(self):
    pass

Runtime Override

Override Kubernetes parameters at runtime:
python flow.py run --with kubernetes:cpu=8,memory=32768,gpu=1

Environment Variables

When running on Kubernetes, these environment variables are available:
  • METAFLOW_KUBERNETES_WORKLOAD - Indicates running on Kubernetes
  • METAFLOW_KUBERNETES_POD_NAME - The pod name
  • METAFLOW_KUBERNETES_POD_NAMESPACE - The pod namespace
  • METAFLOW_KUBERNETES_POD_ID - The pod ID
  • METAFLOW_KUBERNETES_NODE_IP - The node IP address

Best Practices

  1. Use resource requests wisely: Set CPU/memory based on actual needs
  2. Leverage node selectors: Use node selectors to run on appropriate hardware
  3. Quality of Service: Use Guaranteed QoS for critical workloads
  4. Persistent storage: Use PVCs for data that needs to persist across runs
  5. Security: Use security contexts to enforce least-privilege principles
  6. Monitoring: Add labels and annotations for observability

Common Patterns

Multi-Cloud Deployment

@kubernetes(
    cpu=4,
    memory=16384,
    node_selector={'cloud.google.com/gke-nodepool': 'ml-pool'}
)
@step
def gcp_task(self):
    pass

With Error Handling

@retry(times=3)
@timeout(hours=2)
@kubernetes(cpu=4, memory=16384)
@step
def robust_processing(self):
    # Retries on failure, times out after 2 hours
    pass

See Also

Build docs developers (and LLMs) love