Skip to main content
The Flyte Ray plugin dispatches @task(task_config=RayJobConfig(...)) tasks to the KubeRay operator, which manages RayJob and RayCluster Kubernetes resources.

Prerequisites

  • A running Kubernetes cluster with Flyte installed
  • helm and kubectl configured

Step 1: Install the KubeRay operator

helm repo add kuberay https://ray-project.github.io/kuberay-helm/
helm repo update

helm install kuberay-operator kuberay/kuberay-operator \
  --namespace ray-system \
  --version 1.1.0 \
  --create-namespace
Verify the operator is running:
kubectl get pods -n ray-system

Step 2: Enable the Ray plugin in Flyte

Create a values-ray.yaml override file:
configuration:
  inline:
    tasks:
      task-plugins:
        enabled-plugins:
          - container
          - sidecar
          - k8s-array
          - ray
        default-for-task-types:
          - container: container
          - container_array: k8s-array
          - ray: ray
    plugins:
      ray:
        # Shut down Ray cluster after 1 hour of inactivity
        ttlSecondsAfterFinished: 3600
Apply the override:
helm upgrade flyte-backend flyteorg/flyte-binary \
  --namespace flyte \
  --values values.yaml \
  --values values-ray.yaml

Step 3: Write a Ray task

Install the flytekit Ray plugin:
pip install flytekitplugins-ray

Basic Ray task

import ray
from flytekit import task, workflow
from flytekitplugins.ray import RayJobConfig, WorkerNodeConfig, HeadNodeConfig


@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={"log-color": "True"},
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group",
                replicas=2,
                min_replicas=1,
                max_replicas=4,
            )
        ],
        runtime_env={
            "pip": ["numpy", "pandas"],
        },
        ttl_seconds_after_finished=3600,
    )
)
def ray_task(n: int) -> float:
    @ray.remote
    def compute(i: int) -> float:
        return i * i * 1.0

    futures = [compute.remote(i) for i in range(n)]
    return sum(ray.get(futures))


@workflow
def ray_wf(n: int = 10) -> float:
    return ray_task(n=n)

Ray task with GPU workers

from flytekit import task
from flytekit.models.resources import Resources
from flytekitplugins.ray import RayJobConfig, WorkerNodeConfig, HeadNodeConfig


@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={"num-cpus": "1"},
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="gpu-workers",
                replicas=4,
                resources=Resources(
                    cpu="4",
                    mem="16Gi",
                    gpu="1",
                ),
            )
        ],
    )
)
def gpu_ray_task(model_path: str) -> str:
    @ray.remote(num_gpus=1)
    def train_on_gpu(path: str) -> str:
        # Training code here
        return f"trained: {path}"

    return ray.get(train_on_gpu.remote(model_path))

TTL and cluster cleanup

By default, ttlSecondsAfterFinished controls when Ray clusters are deleted after a job completes. Set it in the plugin config globally or per-task:
plugins:
  ray:
    ttlSecondsAfterFinished: 3600  # 1 hour
Or override per task:
@task(
    task_config=RayJobConfig(
        ttl_seconds_after_finished=7200,  # 2 hours for this task
        ...
    )
)
def long_running_ray_task() -> str:
    ...

Verify

# Check KubeRay operator is healthy
kubectl get pods -n ray-system

# After running a Ray task, check RayJob resource
kubectl get rayjob -n flytesnacks-development

# Check the Ray cluster status
kubectl get raycluster -n flytesnacks-development

Troubleshooting

Check if nodes have sufficient CPU/memory/GPU resources:
kubectl describe raycluster <cluster-name> -n flytesnacks-development
kubectl get events -n flytesnacks-development --sort-by=.lastTimestamp
Ray workers use the same container image as the head node unless overridden. Ensure the image is accessible from your cluster’s nodes. For private registries, configure imagePullSecrets in a default PodTemplate.
Verify the KubeRay operator version supports ttlSecondsAfterFinished. Upgrade to kuberay-operator >= 1.0.0:
helm upgrade kuberay-operator kuberay/kuberay-operator \
  --namespace ray-system \
  --version 1.1.0

Build docs developers (and LLMs) love