Flink’s ZooKeeper HA service uses Apache ZooKeeper for distributed coordination between JobManager instances. ZooKeeper handles leader election and provides consistent, highly available storage for the small amount of recovery metadata that Flink needs.
How it works
All JobManager instances connect to the same ZooKeeper quorum. ZooKeeper manages the leader election: at any time, exactly one JobManager holds the leadership lock. When the active JobManager crashes, ZooKeeper detects the session expiry and triggers a new election among the standby candidates.
Recovery metadata (JobGraph pointers, checkpoint metadata, leader addresses) is stored at a configurable path in high-availability.storageDir (typically HDFS or S3), with only lightweight pointers stored in ZooKeeper itself.
Required configuration
Add the following to conf/config.yaml on all cluster nodes:
# Enable ZooKeeper HA
high-availability.type: zookeeper
# ZooKeeper quorum (comma-separated host:port pairs)
high-availability.zookeeper.quorum: zk-host1:2181,zk-host2:2181,zk-host3:2181
# Root ZooKeeper node for all Flink clusters
high-availability.zookeeper.path.root: /flink
# Unique identifier for this cluster within the ZooKeeper root
high-availability.cluster-id: /my-flink-cluster
# Filesystem path for JobManager recovery metadata
high-availability.storageDir: hdfs:///flink/recovery
On YARN and native Kubernetes, do not set high-availability.cluster-id manually. Flink generates it automatically from the application ID. Manually setting this value can cause multiple clusters to interfere with each other.
Standalone HA example: two JobManagers
Configure HA in config.yaml
high-availability.type: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one
high-availability.storageDir: hdfs:///flink/recovery
Configure conf/masters with two JobManagers
localhost:8081
localhost:8082
Configure ZooKeeper in conf/zoo.cfg
server.0=localhost:2888:3888
Start ZooKeeper
./bin/start-zookeeper-quorum.sh
# Starting zookeeper daemon on host localhost.
Start the HA cluster
./bin/start-cluster.sh
# Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
# Starting standalonesession daemon on host localhost.
# Starting standalonesession daemon on host localhost.
# Starting taskexecutor daemon on host localhost.
Stop the cluster
./bin/stop-cluster.sh
./bin/stop-zookeeper-quorum.sh
ZooKeeper paths
Flink stores data in ZooKeeper under a hierarchy based on your configuration:
{high-availability.zookeeper.path.root}
└── {high-availability.cluster-id}
├── leader/ # Current leader pointer
├── latch/ # Leader election latch
├── jobgraphs/ # References to persisted JobGraphs
└── checkpoints/ # References to completed checkpoint metadata
The actual data (JobGraphs, checkpoint files) is stored at high-availability.storageDir, not in ZooKeeper.
JobManager port configuration
By default, the JobManager picks a random port for inter-process RPC. To use a fixed port or range:
high-availability.jobmanager.port: 50010
# Or a range:
high-availability.jobmanager.port: 50000-50025
ZooKeeper security (Kerberos)
If your ZooKeeper quorum requires Kerberos authentication, configure SASL in conf/config.yaml:
# ZooKeeper service name (default: zookeeper)
zookeeper.sasl.service-name: zookeeper
# JAAS login context name (default: Client)
# Must match an entry in security.kerberos.login.contexts
zookeeper.sasl.login-context-name: Client
Advanced: ZooKeeper client retry configuration
Flink uses bounded exponential backoff when ZooKeeper connections are interrupted:
# Initial wait between retries (doubles each attempt, default: 5s)
high-availability.zookeeper.client.retry-wait: 5 s
# Maximum wait between retries (caps exponential backoff, default: 60s)
high-availability.zookeeper.client.max-retry-wait: 60 s
# Maximum number of retry attempts (default: 3)
high-availability.zookeeper.client.max-retry-attempts: 3
Tolerating suspended connections
By default, a suspended ZooKeeper connection causes Flink to invalidate all leadership and trigger failover. In unstable network environments, you can configure Flink to tolerate suspended connections:
high-availability.zookeeper.client.tolerate-suspended-connections: true
Enabling this option makes Flink more resilient to transient network issues but increases the risk of ZooKeeper timing problems. Only enable it if you are experiencing frequent unnecessary failovers.
Bootstrapping ZooKeeper with Flink’s helper scripts
Flink ships a bundled ZooKeeper configuration template at conf/zoo.cfg. For development or small deployments:
# conf/zoo.cfg
server.0=host0:2888:3888
server.1=host1:2888:3888
server.2=host2:2888:3888
# Start ZooKeeper on all configured hosts
./bin/start-zookeeper-quorum.sh
The bundled ZooKeeper is suitable for testing and development. For production, manage your own dedicated ZooKeeper installation.