How to configure Apache Flink using config.yaml, environment variables, and programmatic configuration.
All Flink configuration lives in conf/config.yaml inside your Flink distribution. Flink reads and evaluates this file when processes start. Changing it requires restarting the affected processes.
Starting with Flink 2.0, the only supported configuration file is conf/config.yaml, which follows standard YAML 1.2 syntax. The legacy flink-conf.yaml format is no longer supported.
Set the FLINK_CONF_DIR environment variable to point Flink at a different configuration directory. This is useful for per-job configurations in Application mode on YARN or native Kubernetes.For Docker-based deployments, pass configuration values via the FLINK_PROPERTIES environment variable instead.
For standalone deployments (without HA), configure where the JobManager can be reached:
# Used by clients and TaskManagers to reach the JobManagerrest.address: jobmanager-hostrest.port: 8081# Used by TaskManagers for RPC to the JobManagerjobmanager.rpc.address: jobmanager-hostjobmanager.rpc.port: 6123
On YARN and native Kubernetes, hostnames and ports are discovered automatically. You do not need to set these.
# Default parallelism when none is set on the job or operator (default: 1)parallelism.default: 4# Number of task slots per TaskManager (default: 1)taskmanager.numberOfTaskSlots: 4
Running many small TaskManagers with one slot each maximizes task isolation. Fewer, larger TaskManagers with more slots reduce JVM overhead but increase resource contention between tasks.
The simplest approach is to set the total process size for each component and let Flink partition memory automatically:
# Total memory for the JobManager JVM processjobmanager.memory.process.size: 1600m# Total memory for each TaskManager JVM processtaskmanager.memory.process.size: 4096m
For finer control, you can set managed memory and heap size separately:
# Explicit TaskManager heap and managed memorytaskmanager.memory.task.heap.size: 1024mtaskmanager.memory.managed.size: 1024m# Fraction of total Flink memory to use as managed memory (default: 0.4)taskmanager.memory.managed.fraction: 0.4
# Allow job submission through the web UI (default: true)web.submit.enable: true# Allow job cancellation through the web UI (default: true)web.cancel.enable: true# Directory for uploaded JARsweb.upload.dir: /tmp/flink-uploads
When deploying on YARN or Kubernetes, you can forward custom environment variables to JobManager and TaskManager processes:
# Set LD_LIBRARY_PATH on the JobManagercontainerized.master.env.LD_LIBRARY_PATH: /usr/lib/native# Set a custom variable on all TaskManagerscontainerized.taskmanager.env.MY_CUSTOM_VAR: my-value
You can also set configuration values directly in your application code:
Configuration config = new Configuration();config.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30));StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
Programmatic configuration only affects job-level settings. Cluster-level options (memory sizes, number of slots, HA settings) must be set in config.yaml.