Skip to main content
Flink’s ZooKeeper HA service uses Apache ZooKeeper for distributed coordination between JobManager instances. ZooKeeper handles leader election and provides consistent, highly available storage for the small amount of recovery metadata that Flink needs.

How it works

All JobManager instances connect to the same ZooKeeper quorum. ZooKeeper manages the leader election: at any time, exactly one JobManager holds the leadership lock. When the active JobManager crashes, ZooKeeper detects the session expiry and triggers a new election among the standby candidates. Recovery metadata (JobGraph pointers, checkpoint metadata, leader addresses) is stored at a configurable path in high-availability.storageDir (typically HDFS or S3), with only lightweight pointers stored in ZooKeeper itself.

Required configuration

Add the following to conf/config.yaml on all cluster nodes:
# Enable ZooKeeper HA
high-availability.type: zookeeper

# ZooKeeper quorum (comma-separated host:port pairs)
high-availability.zookeeper.quorum: zk-host1:2181,zk-host2:2181,zk-host3:2181

# Root ZooKeeper node for all Flink clusters
high-availability.zookeeper.path.root: /flink

# Unique identifier for this cluster within the ZooKeeper root
high-availability.cluster-id: /my-flink-cluster

# Filesystem path for JobManager recovery metadata
high-availability.storageDir: hdfs:///flink/recovery
On YARN and native Kubernetes, do not set high-availability.cluster-id manually. Flink generates it automatically from the application ID. Manually setting this value can cause multiple clusters to interfere with each other.

Standalone HA example: two JobManagers

1

Configure HA in config.yaml

high-availability.type: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one
high-availability.storageDir: hdfs:///flink/recovery
2

Configure conf/masters with two JobManagers

localhost:8081
localhost:8082
3

Configure ZooKeeper in conf/zoo.cfg

server.0=localhost:2888:3888
4

Start ZooKeeper

./bin/start-zookeeper-quorum.sh
# Starting zookeeper daemon on host localhost.
5

Start the HA cluster

./bin/start-cluster.sh
# Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
# Starting standalonesession daemon on host localhost.
# Starting standalonesession daemon on host localhost.
# Starting taskexecutor daemon on host localhost.
6

Stop the cluster

./bin/stop-cluster.sh
./bin/stop-zookeeper-quorum.sh

ZooKeeper paths

Flink stores data in ZooKeeper under a hierarchy based on your configuration:
{high-availability.zookeeper.path.root}
  └── {high-availability.cluster-id}
        ├── leader/                  # Current leader pointer
        ├── latch/                   # Leader election latch
        ├── jobgraphs/               # References to persisted JobGraphs
        └── checkpoints/             # References to completed checkpoint metadata
The actual data (JobGraphs, checkpoint files) is stored at high-availability.storageDir, not in ZooKeeper.

JobManager port configuration

By default, the JobManager picks a random port for inter-process RPC. To use a fixed port or range:
high-availability.jobmanager.port: 50010
# Or a range:
high-availability.jobmanager.port: 50000-50025

ZooKeeper security (Kerberos)

If your ZooKeeper quorum requires Kerberos authentication, configure SASL in conf/config.yaml:
# ZooKeeper service name (default: zookeeper)
zookeeper.sasl.service-name: zookeeper

# JAAS login context name (default: Client)
# Must match an entry in security.kerberos.login.contexts
zookeeper.sasl.login-context-name: Client

Advanced: ZooKeeper client retry configuration

Flink uses bounded exponential backoff when ZooKeeper connections are interrupted:
# Initial wait between retries (doubles each attempt, default: 5s)
high-availability.zookeeper.client.retry-wait: 5 s

# Maximum wait between retries (caps exponential backoff, default: 60s)
high-availability.zookeeper.client.max-retry-wait: 60 s

# Maximum number of retry attempts (default: 3)
high-availability.zookeeper.client.max-retry-attempts: 3

Tolerating suspended connections

By default, a suspended ZooKeeper connection causes Flink to invalidate all leadership and trigger failover. In unstable network environments, you can configure Flink to tolerate suspended connections:
high-availability.zookeeper.client.tolerate-suspended-connections: true
Enabling this option makes Flink more resilient to transient network issues but increases the risk of ZooKeeper timing problems. Only enable it if you are experiencing frequent unnecessary failovers.

Bootstrapping ZooKeeper with Flink’s helper scripts

Flink ships a bundled ZooKeeper configuration template at conf/zoo.cfg. For development or small deployments:
# conf/zoo.cfg
server.0=host0:2888:3888
server.1=host1:2888:3888
server.2=host2:2888:3888
# Start ZooKeeper on all configured hosts
./bin/start-zookeeper-quorum.sh
The bundled ZooKeeper is suitable for testing and development. For production, manage your own dedicated ZooKeeper installation.

Build docs developers (and LLMs) love