Skip to main content
Apache Hadoop YARN is a popular resource manager for data processing workloads. When you deploy Flink on YARN, Flink’s JobManager and TaskManagers run inside YARN containers, and Flink dynamically requests and releases containers from YARN’s ResourceManager.

Prerequisites

  • A running YARN cluster (version 2.10.2 or later). Managed services like Amazon EMR or Google Cloud Dataproc work well.
  • Flink distribution downloaded and unpacked.
  • HADOOP_CLASSPATH environment variable set:
export HADOOP_CLASSPATH=`hadoop classpath`
Verify your YARN cluster is healthy:
yarn top
# Should show no error messages

Deployment modes

Application mode creates a dedicated Flink cluster on YARN for one application. The application’s main() method runs on the JobManager inside YARN. The cluster shuts down when the application finishes.
./bin/flink run -t yarn-application \
    ./examples/streaming/TopSpeedWindowing.jar
For pre-uploaded JARs on HDFS (reduces submission overhead):
./bin/flink run -t yarn-application \
    -Dyarn.provided.lib.dirs="hdfs://myhdfs/flink-dist" \
    hdfs://myhdfs/jars/my-application.jar
Interact with a running Application mode cluster:
# List jobs
./bin/flink list -t yarn-application \
    -Dyarn.application.id=application_XXXX_YY

# Cancel a job (this also stops the cluster)
./bin/flink cancel -t yarn-application \
    -Dyarn.application.id=application_XXXX_YY <jobId>
You can also stop the cluster directly via YARN:
yarn application -kill <ApplicationId>

Session mode

Session mode starts a long-running Flink cluster on YARN that accepts multiple job submissions.
# Start a YARN session (detached)
./bin/yarn-session.sh --detached

# Submit a job to the session
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

# Stop the session
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
Attached vs. detached mode:
  • Attached (default): The yarn-session.sh client keeps running and tracks cluster state. If the cluster fails, the client reports the error.
  • Detached (--detached or -d): The client submits the cluster and returns immediately.
Re-attach to a running session:
./bin/yarn-session.sh -id application_XXXX_YY
Specify the target session when submitting a job:
./bin/flink run -t yarn-session \
    -Dyarn.application.id=application_XXXX_YY \
    ./examples/streaming/TopSpeedWindowing.jar

YARN configuration

Key YARN-specific configuration options to set in conf/config.yaml:
# Maximum number of JobManager container restart attempts (default: 2)
yarn.application-attempts: 2

# Number of vCores to allocate per TaskManager container
# Defaults to taskmanager.numberOfTaskSlots
yarn.containers.vcores: 4

# Control whether user JARs are added to the system or user classpath
yarn.classpath.include-user-jar: ORDER  # ORDER, FIRST, LAST, or DISABLED

# Pre-uploaded Flink distribution on HDFS
yarn.provided.lib.dirs: hdfs://namenode/flink-dist
Pass configuration at submission time using -D:
./bin/yarn-session.sh \
    -Dtaskmanager.memory.process.size=4096m \
    -Dtaskmanager.numberOfTaskSlots=4

Resource allocation

Flink on YARN requests additional TaskManagers when running jobs require more resources than are currently available. In Session mode, unused TaskManagers are released after a configurable timeout. Failed containers (including the JobManager) are automatically replaced by YARN. The number of allowed JobManager restarts is controlled by yarn.application-attempts, bounded by YARN’s yarn.resourcemanager.am.max-attempts setting (defaults to 2 in both).

High availability on YARN

HA on YARN combines YARN’s container restart capability with a Flink HA service (ZooKeeper or Kubernetes). YARN handles restarting the JobManager container; the HA service persists metadata and manages leader election.
high-availability.type: zookeeper
high-availability.zookeeper.quorum: zk-host1:2181,zk-host2:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
# Do NOT manually set high-availability.cluster-id on YARN;
# Flink sets it automatically to the YARN application ID.
Do not manually set high-availability.cluster-id when deploying on YARN. Flink sets it to the YARN application ID automatically. Overriding it can cause multiple YARN clusters to interfere with each other.

YARN version notes

  • YARN < 2.4.0: All containers restart when the application master fails.
  • YARN 2.4.0–2.6.0: TaskManager containers survive application master failure (faster recovery).
  • YARN ≥ 2.6.0: Attempt failure validity interval is set to Flink’s Pekko timeout, preventing long-running jobs from depleting their restart attempts.
YARN 2.4.0 has a critical bug (fixed in 2.5.0) that prevents container restarts from a restarted JobManager. Use at least Hadoop 2.5.0 for HA setups on YARN.

Firewall configuration

If your cluster is behind a firewall, configure a port range for Flink’s REST endpoint so the client can submit jobs from outside the cluster network:
# Single port, range, or combination
rest.bind-port: 50000-50025

Hadoop configuration files

To pass additional Hadoop configuration to Flink:
export HADOOP_CONF_DIR=/path/to/hadoop/conf
Flink loads all required Hadoop configuration from the classpath via HADOOP_CLASSPATH by default.

User JARs and classpath

ModeUser JAR recognition
Session modeOnly the JAR specified in the flink run command
Application modeJAR specified in the command + all JARs in $FLINK_HOME/usrlib/
In Application mode, the position of user JARs in the classpath is controlled by yarn.classpath.include-user-jar:
  • ORDER (default): added based on lexicographic order
  • FIRST: added at the beginning of the system classpath
  • LAST: added at the end of the system classpath
  • DISABLED: added to the user classpath instead

Build docs developers (and LLMs) love