Flink’s native Kubernetes integration lets you deploy Flink directly on a running Kubernetes cluster. Flink communicates with the Kubernetes API server to dynamically create and release TaskManager pods as workloads change.
Prerequisites
- Kubernetes 1.9 or later.
kubectl configured with access to your cluster (~/.kube/config).
- Permissions to list, create, and delete pods and services:
kubectl auth can-i create pods
- Kubernetes DNS enabled.
- A service account with RBAC permissions (see RBAC setup below).
Starting a session cluster
Start the Kubernetes session
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster
Submit a job
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar
Stop the session cluster
kubectl delete deployment/my-first-flink-cluster
By default, Flink’s Web UI and REST endpoint are exposed as a ClusterIP service (accessible only within the cluster). See Accessing the Web UI for options.
Deployment modes
Application mode (recommended for production)
Application mode creates a dedicated cluster per application. The application’s main() method runs on the JobManager pod.
Option 1: Build a custom Docker image
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/to/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
Submit using the custom image:
./bin/flink run \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-app-cluster \
-Dkubernetes.container.image.ref=my-custom-image:latest \
local:///opt/flink/usrlib/my-flink-job.jar
Option 2: Upload local JAR to distributed storage
./bin/flink run \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-app-cluster \
-Dkubernetes.container.image.ref=flink:latest \
-Dkubernetes.artifacts.local-upload-enabled=true \
-Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
local:///tmp/my-flink-job.jar
Option 3: JAR already on S3/HDFS/HTTP
./bin/flink run \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-app-cluster \
-Dkubernetes.container.image.ref=flink:latest \
s3://my-bucket/my-flink-job.jar
Interact with an Application mode cluster:
# List jobs
./bin/flink list \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-app-cluster
# Cancel a job
./bin/flink cancel \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-app-cluster <jobId>
Session mode
Session mode runs a single cluster for multiple jobs. The kubernetes-session.sh script launches in detached mode by default:
# Start session
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-session-cluster
# Attach to a running session
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-session-cluster \
-Dexecution.attached=true
# Stop session
echo 'stop' | ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-session-cluster \
-Dexecution.attached=true
Key configuration options
# Unique name for the Flink cluster (required)
kubernetes.cluster-id: my-flink-cluster
# Docker image for JobManager and TaskManager pods
kubernetes.container.image.ref: flink:1.20
# Kubernetes namespace (default: default)
kubernetes.namespace: flink
# Service account for JobManager pod
kubernetes.service-account: flink-service-account
# CPU resources for JobManager
kubernetes.jobmanager.cpu: 1.0
# CPU resources for TaskManagers
kubernetes.taskmanager.cpu: 2.0
# Number of JobManager replicas (> 1 requires HA enabled)
kubernetes.jobmanager.replicas: 1
# How the REST service is exposed: ClusterIP, NodePort, LoadBalancer
kubernetes.rest-service.exposed.type: ClusterIP
Accessing the Web UI
ClusterIP (default)
NodePort
LoadBalancer
Forward the service port locally:kubectl port-forward service/my-flink-cluster-rest 8081
# Access at http://localhost:8081
kubernetes.rest-service.exposed.type: NodePort
Access at <NodeIP>:<NodePort>.kubernetes.rest-service.exposed.type: LoadBalancer
After the load balancer is provisioned:kubectl get services/my-flink-cluster-rest
# Use EXTERNAL-IP:8081
LoadBalancer exposes Flink publicly. Ensure your cluster is secured appropriately.
RBAC setup
The default service account may not have the permissions required by Flink. Grant the edit role to the default service account:
kubectl create clusterrolebinding flink-role-binding-default \
--clusterrole=edit \
--serviceaccount=default:default
Or create a dedicated service account:
kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink \
--clusterrole=edit \
--serviceaccount=default:flink-service-account
Then configure Flink to use it:
-Dkubernetes.service-account=flink-service-account
Pod templates
Pod templates let you configure advanced features not directly exposed by Flink’s configuration options (init containers, sidecar containers, custom volumes, node selectors, etc.).
Specify a template file:
kubernetes.pod-template-file.default: /path/to/pod-template.yaml
Example pod-template.yaml:
apiVersion: v1
kind: Pod
metadata:
name: jobmanager-pod-template
spec:
initContainers:
- name: artifacts-fetcher
image: busybox:latest
command: ['wget', 'https://example.com/my-job.jar', '-O', '/flink-artifact/myjob.jar']
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
# Main container name must be flink-main-container
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/artifacts
name: flink-artifact
- mountPath: /opt/flink/log
name: flink-logs
- name: sidecar-log-collector
image: log-collector:latest
command: ['upload-logs', '/flink-logs/']
volumeMounts:
- mountPath: /flink-logs
name: flink-logs
volumes:
- name: flink-artifact
emptyDir: {}
- name: flink-logs
emptyDir: {}
The main Flink container must always be named flink-main-container. Flink overwrites some pod fields (pod name, owner references, restart policy); fields not listed in the template are left unchanged.
High availability on Kubernetes
For HA, set kubernetes.jobmanager.replicas greater than 1 and configure the Kubernetes HA service:
kubernetes.cluster-id: my-ha-cluster
kubernetes.jobmanager.replicas: 2
high-availability.type: kubernetes
high-availability.storageDir: s3://my-bucket/flink/recovery
See Kubernetes HA for full details.
Mounting persistent volume claims
Mount PVCs directly to JobManager and TaskManager pods:
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-cluster \
-Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints
Multiple PVCs:
-Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data
For HA setups where both JobManager and TaskManagers write to the same storage, the PVC must support ReadWriteMany access mode.
Resource cleanup
Flink sets OwnerReference on all resources it creates (ConfigMaps, Services, Pods). Deleting the Flink deployment automatically removes all associated resources:
kubectl delete deployment/<cluster-id>
Accessing logs
# View logs for a specific pod
kubectl logs <pod-name>
# Open a shell inside a running pod
kubectl exec -it <pod-name> bash
To change log level dynamically:
kubectl edit cm flink-config-<cluster-id>
Using Kubernetes Secrets
Mount a secret as files:
./bin/kubernetes-session.sh \
-Dkubernetes.secrets=mysecret:/path/to/secret
Expose a secret as environment variables:
./bin/kubernetes-session.sh \
-Dkubernetes.env.secretKeyRef=\
env:SECRET_USERNAME,secret:mysecret,key:username;\
env:SECRET_PASSWORD,secret:mysecret,key:password