Apache Hadoop YARN is a popular resource manager for data processing workloads. When you deploy Flink on YARN, Flink’s JobManager and TaskManagers run inside YARN containers, and Flink dynamically requests and releases containers from YARN’s ResourceManager.
Prerequisites
- A running YARN cluster (version 2.10.2 or later). Managed services like Amazon EMR or Google Cloud Dataproc work well.
- Flink distribution downloaded and unpacked.
HADOOP_CLASSPATH environment variable set:
export HADOOP_CLASSPATH=`hadoop classpath`
Verify your YARN cluster is healthy:
yarn top
# Should show no error messages
Deployment modes
Application mode (recommended for production)
Application mode creates a dedicated Flink cluster on YARN for one application. The application’s main() method runs on the JobManager inside YARN. The cluster shuts down when the application finishes.
./bin/flink run -t yarn-application \
./examples/streaming/TopSpeedWindowing.jar
For pre-uploaded JARs on HDFS (reduces submission overhead):
./bin/flink run -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/flink-dist" \
hdfs://myhdfs/jars/my-application.jar
Interact with a running Application mode cluster:
# List jobs
./bin/flink list -t yarn-application \
-Dyarn.application.id=application_XXXX_YY
# Cancel a job (this also stops the cluster)
./bin/flink cancel -t yarn-application \
-Dyarn.application.id=application_XXXX_YY <jobId>
You can also stop the cluster directly via YARN:
yarn application -kill <ApplicationId>
Session mode
Session mode starts a long-running Flink cluster on YARN that accepts multiple job submissions.
# Start a YARN session (detached)
./bin/yarn-session.sh --detached
# Submit a job to the session
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
# Stop the session
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
Attached vs. detached mode:
- Attached (default): The
yarn-session.sh client keeps running and tracks cluster state. If the cluster fails, the client reports the error.
- Detached (
--detached or -d): The client submits the cluster and returns immediately.
Re-attach to a running session:
./bin/yarn-session.sh -id application_XXXX_YY
Specify the target session when submitting a job:
./bin/flink run -t yarn-session \
-Dyarn.application.id=application_XXXX_YY \
./examples/streaming/TopSpeedWindowing.jar
YARN configuration
Key YARN-specific configuration options to set in conf/config.yaml:
# Maximum number of JobManager container restart attempts (default: 2)
yarn.application-attempts: 2
# Number of vCores to allocate per TaskManager container
# Defaults to taskmanager.numberOfTaskSlots
yarn.containers.vcores: 4
# Control whether user JARs are added to the system or user classpath
yarn.classpath.include-user-jar: ORDER # ORDER, FIRST, LAST, or DISABLED
# Pre-uploaded Flink distribution on HDFS
yarn.provided.lib.dirs: hdfs://namenode/flink-dist
Pass configuration at submission time using -D:
./bin/yarn-session.sh \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=4
Resource allocation
Flink on YARN requests additional TaskManagers when running jobs require more resources than are currently available. In Session mode, unused TaskManagers are released after a configurable timeout.
Failed containers (including the JobManager) are automatically replaced by YARN. The number of allowed JobManager restarts is controlled by yarn.application-attempts, bounded by YARN’s yarn.resourcemanager.am.max-attempts setting (defaults to 2 in both).
High availability on YARN
HA on YARN combines YARN’s container restart capability with a Flink HA service (ZooKeeper or Kubernetes). YARN handles restarting the JobManager container; the HA service persists metadata and manages leader election.
high-availability.type: zookeeper
high-availability.zookeeper.quorum: zk-host1:2181,zk-host2:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
# Do NOT manually set high-availability.cluster-id on YARN;
# Flink sets it automatically to the YARN application ID.
Do not manually set high-availability.cluster-id when deploying on YARN. Flink sets it to the YARN application ID automatically. Overriding it can cause multiple YARN clusters to interfere with each other.
YARN version notes
- YARN < 2.4.0: All containers restart when the application master fails.
- YARN 2.4.0–2.6.0: TaskManager containers survive application master failure (faster recovery).
- YARN ≥ 2.6.0: Attempt failure validity interval is set to Flink’s Pekko timeout, preventing long-running jobs from depleting their restart attempts.
YARN 2.4.0 has a critical bug (fixed in 2.5.0) that prevents container restarts from a restarted JobManager. Use at least Hadoop 2.5.0 for HA setups on YARN.
Firewall configuration
If your cluster is behind a firewall, configure a port range for Flink’s REST endpoint so the client can submit jobs from outside the cluster network:
# Single port, range, or combination
rest.bind-port: 50000-50025
Hadoop configuration files
To pass additional Hadoop configuration to Flink:
export HADOOP_CONF_DIR=/path/to/hadoop/conf
Flink loads all required Hadoop configuration from the classpath via HADOOP_CLASSPATH by default.
User JARs and classpath
| Mode | User JAR recognition |
|---|
| Session mode | Only the JAR specified in the flink run command |
| Application mode | JAR specified in the command + all JARs in $FLINK_HOME/usrlib/ |
In Application mode, the position of user JARs in the classpath is controlled by yarn.classpath.include-user-jar:
ORDER (default): added based on lexicographic order
FIRST: added at the beginning of the system classpath
LAST: added at the end of the system classpath
DISABLED: added to the user classpath instead