Skip to main content
All Flink configuration lives in conf/config.yaml inside your Flink distribution. Flink reads and evaluates this file when processes start. Changing it requires restarting the affected processes. Starting with Flink 2.0, the only supported configuration file is conf/config.yaml, which follows standard YAML 1.2 syntax. The legacy flink-conf.yaml format is no longer supported.

Nested vs. flat format

You can write config keys in either nested YAML format or flat dot-separated format. Both are equivalent:
restart-strategy:
  type: failure-rate
  failure-rate:
    delay: 1 s
    failure-rate-interval: 1 min
    max-failures-per-interval: 1
Flink ships a migration script that reads flink-conf.yaml and produces the equivalent config.yaml:
# Place flink-conf.yaml in conf/ then run:
bin/migrate-config-file.sh
config.yaml is stricter than the old format: duplicate keys cause an error, and invalid key-value pairs are rejected rather than silently ignored.

Specifying a different configuration directory

Set the FLINK_CONF_DIR environment variable to point Flink at a different configuration directory. This is useful for per-job configurations in Application mode on YARN or native Kubernetes. For Docker-based deployments, pass configuration values via the FLINK_PROPERTIES environment variable instead.

Essential configuration options

Hostnames and ports

For standalone deployments (without HA), configure where the JobManager can be reached:
# Used by clients and TaskManagers to reach the JobManager
rest.address: jobmanager-host
rest.port: 8081

# Used by TaskManagers for RPC to the JobManager
jobmanager.rpc.address: jobmanager-host
jobmanager.rpc.port: 6123
On YARN and native Kubernetes, hostnames and ports are discovered automatically. You do not need to set these.

Parallelism

# Default parallelism when none is set on the job or operator (default: 1)
parallelism.default: 4

# Number of task slots per TaskManager (default: 1)
taskmanager.numberOfTaskSlots: 4
Running many small TaskManagers with one slot each maximizes task isolation. Fewer, larger TaskManagers with more slots reduce JVM overhead but increase resource contention between tasks.

Memory

The simplest approach is to set the total process size for each component and let Flink partition memory automatically:
# Total memory for the JobManager JVM process
jobmanager.memory.process.size: 1600m

# Total memory for each TaskManager JVM process
taskmanager.memory.process.size: 4096m
For finer control, you can set managed memory and heap size separately:
# Explicit TaskManager heap and managed memory
taskmanager.memory.task.heap.size: 1024m
taskmanager.memory.managed.size: 1024m

# Fraction of total Flink memory to use as managed memory (default: 0.4)
taskmanager.memory.managed.fraction: 0.4

Checkpointing

# State backend: hashmap (heap), rocksdb, or forst
state.backend.type: rocksdb

# Directory where checkpoints are written
execution.checkpointing.dir: s3://my-bucket/flink/checkpoints

# Default savepoint directory
execution.checkpointing.savepoint-dir: s3://my-bucket/flink/savepoints

# Checkpoint interval in milliseconds (0 = disabled)
execution.checkpointing.interval: 60000

Network memory (TaskManager)

# Fraction of total Flink memory reserved for network buffers (default: 0.1)
taskmanager.network.memory.fraction: 0.1

# Minimum network memory (default: 64mb)
taskmanager.network.memory.min: 64mb

# Maximum network memory (default: 1gb)
taskmanager.network.memory.max: 1gb

Web UI

# Allow job submission through the web UI (default: true)
web.submit.enable: true

# Allow job cancellation through the web UI (default: true)
web.cancel.enable: true

# Directory for uploaded JARs
web.upload.dir: /tmp/flink-uploads

Temporary directories

# Where Flink writes local data: RocksDB files, spilled results, cached JARs
io.tmp.dirs: /data/flink-tmp
Set io.tmp.dirs to a directory that is not automatically purged. If this data is deleted unexpectedly, Flink must perform a heavyweight recovery.

Fault tolerance and restart strategies

# Fixed-delay: restart up to 3 times, waiting 10 seconds between attempts
restart-strategy:
  type: fixed-delay
  fixed-delay:
    attempts: 3
    delay: 10 s

# Exponential backoff: double the delay between attempts
restart-strategy:
  type: exponential-delay
  exponential-delay:
    initial-backoff: 1 s
    max-backoff: 60 s
    backoff-multiplier: 2.0
    jitter-factor: 0.1
    attempts-before-reset-backoff: 10

# Failure rate: allow up to 3 failures per minute
restart-strategy:
  type: failure-rate
  failure-rate:
    max-failures-per-interval: 3
    failure-rate-interval: 1 min
    delay: 10 s

Environment variables

Flink reads the following environment variables at startup:
VariablePurpose
JAVA_HOMEPath to the Java installation to use
FLINK_CONF_DIROverride the configuration directory (default: conf/)
FLINK_PROPERTIESPass config values to Docker containers (semicolon-separated key=value pairs)
HADOOP_CLASSPATHRequired when running on YARN; set with hadoop classpath
HADOOP_CONF_DIRDirectory containing Hadoop configuration files

Forwarding environment variables to containers

When deploying on YARN or Kubernetes, you can forward custom environment variables to JobManager and TaskManager processes:
# Set LD_LIBRARY_PATH on the JobManager
containerized.master.env.LD_LIBRARY_PATH: /usr/lib/native

# Set a custom variable on all TaskManagers
containerized.taskmanager.env.MY_CUSTOM_VAR: my-value

Dynamic properties

You can pass configuration parameters at submission time using -D flags. These override any values in config.yaml:
./bin/flink run \
    -Dparallelism.default=16 \
    -Dtaskmanager.memory.process.size=4096m \
    -Dexecution.checkpointing.interval=30000 \
    ./my-job.jar
This is especially useful in Application mode where you want per-job configuration without modifying the shared configuration file.

Programmatic configuration

You can also set configuration values directly in your application code:
Configuration config = new Configuration();
config.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30));

StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment(config);
Programmatic configuration only affects job-level settings. Cluster-level options (memory sizes, number of slots, HA settings) must be set in config.yaml.

Configuration for specific deployment targets

For deployment-specific configuration options, see the relevant resource provider page:

Build docs developers (and LLMs) love