Flink tightly controls memory usage across its components to deliver efficient, predictable performance on the JVM. Understanding the memory model helps you size your cluster correctly and avoid out-of-memory errors.
Memory model overview
For both TaskManagers and JobManagers, Flink divides the total JVM process memory into a hierarchy of components:
Total Process Memory
├── Total Flink Memory
│ ├── JVM Heap
│ │ ├── Framework Heap (Flink internal structures)
│ │ └── Task Heap (user operator code) [TM only]
│ └── Off-Heap Memory
│ ├── Managed Memory (Flink-managed, off-heap) [TM only]
│ ├── Network Memory (network buffers) [TM only]
│ └── Framework/Task Off-Heap (direct and native)
└── JVM Overhead
├── JVM Metaspace
└── JVM Overhead (thread stacks, code cache, GC)
Configuring total memory
The simplest and recommended approach is to set the total process memory and let Flink partition it automatically:
# Total memory for the JobManager JVM process
jobmanager.memory.process.size: 1600m
# Total memory for each TaskManager JVM process
taskmanager.memory.process.size: 4096m
Alternatively, set only the Flink-internal portion (excluding JVM overhead):
jobmanager.memory.flink.size: 1280m
taskmanager.memory.flink.size: 3200m
Do not configure both *.memory.process.size and *.memory.flink.size simultaneously. This can cause conflicting memory configurations and deployment failures.
TaskManager memory components
The TaskManager memory model is more detailed than the JobManager’s because it must account for user code, network communication, and state backends.
Task heap memory
JVM heap dedicated to your operator code (user-defined functions, state operations, etc.):
taskmanager.memory.task.heap.size: 1024m
Managed memory
Flink-managed native (off-heap) memory used for:
- RocksDB and ForSt state backends
- Sorting and hash tables for batch jobs
- Python UDF worker processes
# Set an explicit size
taskmanager.memory.managed.size: 1024m
# Or set as a fraction of total Flink memory (default: 0.4)
taskmanager.memory.managed.fraction: 0.4
If both are set, managed.size takes precedence.
Network memory
Memory for network buffers that hold data in-flight during shuffles, broadcasts, and pipelines between operators:
# Fraction of total Flink memory (default: 0.1)
taskmanager.network.memory.fraction: 0.1
# Minimum network memory (default: 64mb)
taskmanager.network.memory.min: 64mb
# Maximum network memory (default: 1gb)
taskmanager.network.memory.max: 1gb
Framework heap and off-heap
Memory reserved for Flink’s own data structures and internal operations. These are typically not adjusted:
# Default: 128mb
taskmanager.memory.framework.heap.size: 128m
# Default: 128mb
taskmanager.memory.framework.off-heap.size: 128m
JobManager memory components
The JobManager has a simpler memory model:
| Component | Config option | Default |
|---|
| JVM Heap | jobmanager.memory.heap.size | Derived from process size |
| Off-Heap | jobmanager.memory.off-heap.size | 128mb |
| JVM Metaspace | jobmanager.memory.jvm-metaspace.size | 256mb |
| JVM Overhead (min) | jobmanager.memory.jvm-overhead.min | 192mb |
| JVM Overhead (max) | jobmanager.memory.jvm-overhead.max | 1gb |
| JVM Overhead fraction | jobmanager.memory.jvm-overhead.fraction | 0.1 |
JobManager heap is used by Flink’s framework and by user code running during job submission (e.g., certain batch sources or checkpoint callbacks).
JVM overhead (capped fractionated component)
JVM Overhead covers thread stacks, code cache, and GC space. It is calculated as a fraction of total process memory, bounded by a min/max range:
taskmanager.memory.jvm-overhead.min: 192mb
taskmanager.memory.jvm-overhead.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.1
For a 4096mb TaskManager process:
- Fraction: 4096 × 0.1 = 409.6mb
- Result: 409.6mb (within the 192mb–1gb range)
If the fraction yields a value below the minimum, the minimum is used. If it exceeds the maximum, the maximum is used.
Practical sizing examples
Streaming job with RocksDB state backend
RocksDB uses managed memory, so allocate more of it:
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.5 # 50% for RocksDB
Batch job with large sorts and joins
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.4 # 40% for sorting/hashing
taskmanager.memory.task.heap.size: 1024m
Containerized deployment (Kubernetes or YARN)
Use *.memory.process.size to match the requested container size:
taskmanager.memory.process.size: 4096m # Match Kubernetes resource limit
jobmanager.memory.process.size: 1600m
Standalone deployment
For standalone clusters, use *.memory.flink.size to declare how much Flink itself uses, leaving the JVM overhead to the system:
taskmanager.memory.flink.size: 3200m
jobmanager.memory.flink.size: 1280m
Debugging memory issues
Common memory-related failures and their solutions:
| Error | Likely cause | Solution |
|---|
OutOfMemoryError: Java heap space | Task heap too small | Increase taskmanager.memory.task.heap.size or total process size |
OutOfMemoryError: GC overhead limit exceeded | Heap too small for the data volume | Increase heap or reduce parallelism |
IOException: Insufficient number of network buffers | Network memory too small | Increase taskmanager.network.memory.fraction or min/max |
OutOfMemoryError: Direct buffer memory | Off-heap memory exhausted | Increase taskmanager.memory.task.off-heap.size |
RocksDB Not enough memory | Managed memory too small | Increase taskmanager.memory.managed.fraction |
When diagnosing memory problems, check the Flink Web UI’s Task Manager tab. It shows the allocated memory breakdown for each component alongside JVM metrics.
JVM parameters set by Flink
Flink automatically sets these JVM arguments when starting processes:
| JVM flag | Value for TaskManager | Value for JobManager |
|---|
-Xmx / -Xms | Framework + Task Heap | JVM Heap |
-XX:MaxDirectMemorySize | Framework + Task Off-Heap + Network | Off-Heap memory |
-XX:MaxMetaspaceSize | JVM Metaspace | JVM Metaspace |