Skip to main content
Flink’s Kubernetes HA service uses the Kubernetes API directly for leader election and metadata storage. No external ZooKeeper is required. It is available when deploying Flink on Kubernetes, either via standalone on Kubernetes or the native Kubernetes integration.

How it works

Flink stores HA metadata (JobGraph references, checkpoint pointers, leader addresses) in Kubernetes ConfigMaps. Leader election uses the Kubernetes coordination.k8s.io/v1 LeaderElection API (backed by a Lease object). When the active JobManager pod crashes, Kubernetes restarts it (for Session mode) or the Deployment controller starts a new pod (for Application mode). The new pod reads the HA ConfigMaps to recover job state and resume execution from the latest checkpoint.

Prerequisites

  • Kubernetes 1.9 or later.
  • A service account with permissions to create, edit, and delete ConfigMaps. See RBAC setup for details.

Required configuration

# Enable Kubernetes HA
high-availability.type: kubernetes

# Unique cluster identifier (must match kubernetes.cluster-id)
kubernetes.cluster-id: my-flink-cluster

# Filesystem path for JobManager recovery metadata
high-availability.storageDir: s3://my-bucket/flink/recovery
The kubernetes.cluster-id is used as the base name for all HA ConfigMaps created by Flink.

Example: Application mode with HA

./bin/flink run \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-ha-app \
    -Dkubernetes.container.image.ref=my-flink-image:latest \
    -Dkubernetes.jobmanager.replicas=2 \
    -Dhigh-availability.type=kubernetes \
    -Dhigh-availability.storageDir=s3://my-bucket/flink/recovery \
    local:///opt/flink/usrlib/my-job.jar
Setting kubernetes.jobmanager.replicas greater than 1 starts standby JobManager pods that immediately take over if the leader fails, reducing failover time.

Example: Session mode with HA

# conf/config.yaml
kubernetes.cluster-id: my-session-cluster
high-availability.type: kubernetes
high-availability.storageDir: hdfs:///flink/recovery
./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-session-cluster

HA ConfigMaps

Flink creates and manages several ConfigMaps for HA state. These ConfigMaps deliberately do not set an OwnerReference, so they persist even if the Flink Deployment is deleted:
ConfigMap name patternContents
{cluster-id}-jobmanager-leaderCurrent leader address and session ID
{cluster-id}-{job-id}-jobgraphReference to the persisted JobGraph
{cluster-id}-{job-id}-checkpoint-counterLatest completed checkpoint ID

Restarting a cluster while preserving HA state

To stop a Flink cluster without losing HA data (so a restarted cluster can recover jobs from the latest checkpoint):
# Delete only the Deployment, not the HA ConfigMaps
kubectl delete deployment/<cluster-id>
All Flink cluster resources (Deployment, TaskManager pods, Services, Flink configuration ConfigMap) are deleted. The HA ConfigMaps remain because they have no OwnerReference. When you redeploy with the same kubernetes.cluster-id, Flink reads the HA ConfigMaps, recovers all previously running jobs, and restarts them from the latest successful checkpoint.
Deleting HA ConfigMaps manually will cause Flink to start fresh, losing the state of any previously running jobs. Only delete them if you intend to start over completely.

Differences from ZooKeeper HA

Kubernetes HAZooKeeper HA
External dependenciesNone (uses Kubernetes API)Requires ZooKeeper quorum
Deployment targetsKubernetes onlyAny Flink deployment
Metadata storageKubernetes ConfigMapsZooKeeper znodes + filesystem
Leader electionKubernetes Lease APIZooKeeper leader election
Operational complexityLower (no ZooKeeper to manage)Higher

Build docs developers (and LLMs) love