Skip to main content
This checklist covers the configuration options and operational practices you should review before running a Flink job in production. While Flink ships with sensible defaults, several settings require deliberate choices based on your specific workload and SLAs.

Job configuration

1

Set explicit operator UIDs

Assign a stable UID to every stateful operator using uid(String). Without explicit UIDs, Flink auto-generates IDs from the job graph topology. Any structural change to the job (even adding a stateless operator) regenerates these IDs and breaks savepoint compatibility.
stream
    .map(new StatefulMapper()).uid("mapper-v1")
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(new MyAggregator()).uid("aggregator-v1")
    .addSink(new KafkaSink<>(...)).uid("kafka-sink-v1");
2

Set maximum parallelism explicitly

Maximum parallelism determines the upper bound for rescaling and cannot be changed after a job has started without discarding state. Set it high enough for anticipated growth but low enough to keep metadata overhead reasonable.
env.setMaxParallelism(1024); // must be between parallelism and 2^15
If not set, Flink derives it from the operator’s current parallelism:
  • Parallelism ≤ 128: max parallelism = 128
  • Parallelism > 128: MIN(nextPowerOfTwo(parallelism × 1.5), 32768)
3

Choose the right state backend

Select a state backend appropriate for your state size:
State sizeRecommended backend
Fits in JVM heapHashMapStateBackend
Larger than heap, fits on diskEmbeddedRocksDBStateBackend
Exceeds local diskForStStateBackend (experimental)
state.backend.type: rocksdb
execution.checkpointing.dir: hdfs:///flink/checkpoints
4

Configure checkpointing

Enable checkpointing with an interval appropriate for your SLA. The checkpoint interval determines the maximum data you can lose on failure.
execution.checkpointing.interval: 60 s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 30 s
Factors to consider:
  • Recovery SLA: a 5-minute interval means up to 5 minutes of data reprocessing after failure
  • Sink delivery: exactly-once sinks only make results visible at checkpoint boundaries; shorter intervals reduce output latency
  • Cluster load: checkpointing consumes CPU and network; incremental checkpoints reduce per-checkpoint cost
5

Enable incremental checkpoints for RocksDB

If you are using EmbeddedRocksDBStateBackend with large state, enable incremental checkpoints to reduce checkpoint duration and uploaded data volume:
execution.checkpointing.incremental: true
6

Configure retained checkpoints

Configure retained checkpoints so you can restart a job from a checkpoint after a cancellation:
env.getCheckpointConfig().setExternalizedCheckpointRetention(
    ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION
);

Cluster configuration

1

Configure JobManager high availability

Without HA, the JobManager is a single point of failure. Configure HA using ZooKeeper or Kubernetes to allow swift leader re-election and job recovery:
# ZooKeeper-based HA
high-availability.type: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha
# Kubernetes-based HA
high-availability.type: kubernetes
high-availability.storageDir: s3://my-bucket/flink/ha
2

Tune TaskManager memory

Size TaskManager memory carefully. For RocksDB jobs, ensure managed memory is large enough:
taskmanager.memory.process.size: 8g
taskmanager.memory.managed.fraction: 0.5  # 50% of total to managed memory for RocksDB
Monitor Status.JVM.Memory.Heap.Used and Status.Flink.Memory.Managed.Used to verify memory is within bounds.
3

Configure a restart strategy

The default restart strategy when checkpointing is enabled is exponential delay, which is recommended for production. Verify or customise it:
restart-strategy.type: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 1.4
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1
4

Use the region failover strategy

The region failover strategy restarts only the minimum set of tasks needed to recover, reducing the blast radius of individual task failures:
jobmanager.execution.failover-strategy: region

Observability

1

Configure a metrics reporter

Export metrics to an external monitoring system. Prometheus is the most common choice:
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
Key metrics to alert on:
  • numberOfFailedCheckpoints — rising value indicates checkpointing problems
  • lastCheckpointDuration — high values indicate state growth or resource pressure
  • buffers.outPoolUsage (per task) — sustained >0.8 indicates back-pressure
  • numRecordsInPerSecond — monitor for drops indicating source problems
2

Review log configuration

Use INFO log level in production. Set noisy third-party libraries to WARN:
rootLogger.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = WARN
logger.kafka.name = org.apache.kafka
logger.kafka.level = WARN
Integrate log shipping with your existing log aggregation infrastructure (ELK, Loki, etc.).
3

Enable flame graphs (optional)

For performance debugging in non-production environments:
rest.flamegraph.enabled: true
Flame graphs collect stack traces by sampling, which has a small CPU overhead. Enable in production only during active incident investigation.

Security

1

Restrict cluster access

Flink is designed to execute arbitrary user code remotely. Do not expose the JobManager REST API (port 8081) to the public internet. Restrict access via:
  • Network policies (Kubernetes)
  • Security groups or firewall rules
  • A reverse proxy with authentication
2

Configure SSL/TLS

Enable TLS for REST and RPC communication:
security.ssl.rest.enabled: true
security.ssl.rest.keystore: /path/to/keystore.jks
security.ssl.rest.keystore-password: changeit
security.ssl.rest.truststore: /path/to/truststore.jks
security.ssl.rest.truststore-password: changeit
3

Configure Kerberos (if applicable)

For Hadoop/HDFS environments with Kerberos authentication:
security.kerberos.login.principal: [email protected]
security.kerberos.login.keytab: /etc/security/keytabs/flink.keytab

Pre-launch checklist

  • All stateful operators have explicit UIDs
  • Maximum parallelism is set to a value that allows future scaling
  • State backend is appropriate for the expected state size
  • Checkpointing is enabled with an appropriate interval
  • Checkpoint storage points to a distributed filesystem accessible by all nodes
  • Incremental checkpoints are enabled (if using RocksDB with large state)
  • Retained checkpoints or savepoints are configured
  • JobManager HA is configured
  • Restart and failover strategies are configured
  • Memory is sized appropriately for the state backend
  • Metrics reporter is configured and dashboards are in place
  • Alerting is configured for checkpoint failures and back-pressure
  • Log level is set to INFO; log aggregation is set up
  • Cluster access is restricted; TLS is enabled if required

Build docs developers (and LLMs) love