Skip to main content
Flink’s native Kubernetes integration lets you deploy Flink directly on a running Kubernetes cluster. Flink communicates with the Kubernetes API server to dynamically create and release TaskManager pods as workloads change.
Apache Flink also provides a Flink Kubernetes Operator for declarative, lifecycle-managed deployments on Kubernetes.

Prerequisites

  • Kubernetes 1.9 or later.
  • kubectl configured with access to your cluster (~/.kube/config).
  • Permissions to list, create, and delete pods and services:
    kubectl auth can-i create pods
    
  • Kubernetes DNS enabled.
  • A service account with RBAC permissions (see RBAC setup below).

Starting a session cluster

1

Start the Kubernetes session

./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster
2

Submit a job

./bin/flink run \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    ./examples/streaming/TopSpeedWindowing.jar
3

Stop the session cluster

kubectl delete deployment/my-first-flink-cluster
By default, Flink’s Web UI and REST endpoint are exposed as a ClusterIP service (accessible only within the cluster). See Accessing the Web UI for options.

Deployment modes

Application mode creates a dedicated cluster per application. The application’s main() method runs on the JobManager pod. Option 1: Build a custom Docker image
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/to/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
Submit using the custom image:
./bin/flink run \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-app-cluster \
    -Dkubernetes.container.image.ref=my-custom-image:latest \
    local:///opt/flink/usrlib/my-flink-job.jar
Option 2: Upload local JAR to distributed storage
./bin/flink run \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-app-cluster \
    -Dkubernetes.container.image.ref=flink:latest \
    -Dkubernetes.artifacts.local-upload-enabled=true \
    -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
    local:///tmp/my-flink-job.jar
Option 3: JAR already on S3/HDFS/HTTP
./bin/flink run \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-app-cluster \
    -Dkubernetes.container.image.ref=flink:latest \
    s3://my-bucket/my-flink-job.jar
Interact with an Application mode cluster:
# List jobs
./bin/flink list \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-app-cluster

# Cancel a job
./bin/flink cancel \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-app-cluster <jobId>

Session mode

Session mode runs a single cluster for multiple jobs. The kubernetes-session.sh script launches in detached mode by default:
# Start session
./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-session-cluster

# Attach to a running session
./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-session-cluster \
    -Dexecution.attached=true

# Stop session
echo 'stop' | ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-session-cluster \
    -Dexecution.attached=true

Key configuration options

# Unique name for the Flink cluster (required)
kubernetes.cluster-id: my-flink-cluster

# Docker image for JobManager and TaskManager pods
kubernetes.container.image.ref: flink:1.20

# Kubernetes namespace (default: default)
kubernetes.namespace: flink

# Service account for JobManager pod
kubernetes.service-account: flink-service-account

# CPU resources for JobManager
kubernetes.jobmanager.cpu: 1.0

# CPU resources for TaskManagers
kubernetes.taskmanager.cpu: 2.0

# Number of JobManager replicas (> 1 requires HA enabled)
kubernetes.jobmanager.replicas: 1

# How the REST service is exposed: ClusterIP, NodePort, LoadBalancer
kubernetes.rest-service.exposed.type: ClusterIP

Accessing the Web UI

Forward the service port locally:
kubectl port-forward service/my-flink-cluster-rest 8081
# Access at http://localhost:8081

RBAC setup

The default service account may not have the permissions required by Flink. Grant the edit role to the default service account:
kubectl create clusterrolebinding flink-role-binding-default \
    --clusterrole=edit \
    --serviceaccount=default:default
Or create a dedicated service account:
kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink \
    --clusterrole=edit \
    --serviceaccount=default:flink-service-account
Then configure Flink to use it:
-Dkubernetes.service-account=flink-service-account

Pod templates

Pod templates let you configure advanced features not directly exposed by Flink’s configuration options (init containers, sidecar containers, custom volumes, node selectors, etc.). Specify a template file:
kubernetes.pod-template-file.default: /path/to/pod-template.yaml
Example pod-template.yaml:
apiVersion: v1
kind: Pod
metadata:
  name: jobmanager-pod-template
spec:
  initContainers:
    - name: artifacts-fetcher
      image: busybox:latest
      command: ['wget', 'https://example.com/my-job.jar', '-O', '/flink-artifact/myjob.jar']
      volumeMounts:
        - mountPath: /flink-artifact
          name: flink-artifact
  containers:
    # Main container name must be flink-main-container
    - name: flink-main-container
      resources:
        requests:
          ephemeral-storage: 2048Mi
        limits:
          ephemeral-storage: 2048Mi
      volumeMounts:
        - mountPath: /opt/flink/artifacts
          name: flink-artifact
        - mountPath: /opt/flink/log
          name: flink-logs
    - name: sidecar-log-collector
      image: log-collector:latest
      command: ['upload-logs', '/flink-logs/']
      volumeMounts:
        - mountPath: /flink-logs
          name: flink-logs
  volumes:
    - name: flink-artifact
      emptyDir: {}
    - name: flink-logs
      emptyDir: {}
The main Flink container must always be named flink-main-container. Flink overwrites some pod fields (pod name, owner references, restart policy); fields not listed in the template are left unchanged.

High availability on Kubernetes

For HA, set kubernetes.jobmanager.replicas greater than 1 and configure the Kubernetes HA service:
kubernetes.cluster-id: my-ha-cluster
kubernetes.jobmanager.replicas: 2
high-availability.type: kubernetes
high-availability.storageDir: s3://my-bucket/flink/recovery
See Kubernetes HA for full details.

Mounting persistent volume claims

Mount PVCs directly to JobManager and TaskManager pods:
./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-cluster \
    -Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints
Multiple PVCs:
-Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data
For HA setups where both JobManager and TaskManagers write to the same storage, the PVC must support ReadWriteMany access mode.

Resource cleanup

Flink sets OwnerReference on all resources it creates (ConfigMaps, Services, Pods). Deleting the Flink deployment automatically removes all associated resources:
kubectl delete deployment/<cluster-id>

Accessing logs

# View logs for a specific pod
kubectl logs <pod-name>

# Open a shell inside a running pod
kubectl exec -it <pod-name> bash
To change log level dynamically:
kubectl edit cm flink-config-<cluster-id>

Using Kubernetes Secrets

Mount a secret as files:
./bin/kubernetes-session.sh \
    -Dkubernetes.secrets=mysecret:/path/to/secret
Expose a secret as environment variables:
./bin/kubernetes-session.sh \
    -Dkubernetes.env.secretKeyRef=\
env:SECRET_USERNAME,secret:mysecret,key:username;\
env:SECRET_PASSWORD,secret:mysecret,key:password

Build docs developers (and LLMs) love