Flink uses Apache Log4j 2 as its logging framework. All Flink components—JobManager, TaskManager, HistoryServer, and CLI client—write logs via Log4j 2.
Log files and locations
Each Flink daemon writes two files:
| File | Contents |
|---|
<component>.log | Internal logging output (Log4j 2 appenders) |
<component>.out | Standard output and standard error from the JVM process |
In a standalone cluster, these files are written to the log/ directory under $FLINK_HOME. In YARN and Kubernetes deployments, log files are managed by the respective container runtime and accessible through the cluster UI.
Log4j 2 configuration files
Flink ships with several Log4j 2 configuration files in the conf/ directory:
| File | Used by |
|---|
log4j.properties | JobManager and TaskManager daemons |
log4j-cli.properties | Flink CLI client (bin/flink) |
log4j-session.properties | Yarn/Kubernetes session clusters |
log4j-console.properties | Output to console (used in IDE/test environments) |
A minimal log4j.properties looks like:
# Root logger level
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = FileAppender
# File appender
appender.file.name = FileAppender
appender.file.type = RollingFile
appender.file.fileName = ${sys:log.file}
appender.file.filePattern = ${sys:log.file}.%i
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.file.policies.type = Policies
appender.file.policies.size.type = SizeBasedTriggeringPolicy
appender.file.policies.size.size = 100MB
appender.file.strategy.type = DefaultRolloverStrategy
appender.file.strategy.max = 10
Changing log levels
At startup (cluster-wide)
Edit conf/log4j.properties before starting the cluster:
# Set root level to DEBUG
rootLogger.level = DEBUG
# Or set a specific logger to DEBUG
logger.flink.name = org.apache.flink
logger.flink.level = DEBUG
At runtime via REST API
You can change log levels at runtime without restarting the cluster:
# Set root logger to DEBUG on a TaskManager
curl -X POST http://jobmanager:8081/v1/taskmanagers/:taskmanagerid/log-level \
-H 'Content-Type: application/json' \
-d '{"loggers": {"root": "DEBUG"}}'
# Set a specific package to WARN
curl -X POST http://jobmanager:8081/v1/jobmanager/log-level \
-H 'Content-Type: application/json' \
-d '{"loggers": {"org.apache.kafka": "WARN"}}'
Reducing noise from third-party libraries
# Suppress verbose output from common libraries
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = WARN
logger.kafka.name = org.apache.kafka
logger.kafka.level = WARN
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = WARN
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = ERROR
Custom log file paths
Configure log file paths using JVM options. The FLINK_LOG_PREFIX variable is available for use in log file path templates:
# config.yaml
env.java.opts.taskmanager: "-Dlog.file=${FLINK_LOG_PREFIX}.log"
Log files configured with FLINK_LOG_PREFIX are automatically rotated alongside the default .out and .log files when logs roll over.
Structured logging
For log aggregation pipelines (e.g., ELK stack, Loki), use the JSON layout instead of a pattern layout:
appender.file.layout.type = JsonTemplateLayout
appender.file.layout.eventTemplate = {"timestamp": "${json:timestamp}", "level": "${json:level}", "logger": "${json:logger.name}", "message": "${json:message}", "exception": "${json:exception}"}
Requires the log4j-layout-template-json dependency, which is included in the Flink distribution.
Integration with log aggregation
Filebeat / Logstash
Point Filebeat at the Flink log directory:
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /opt/flink/log/*.log
fields:
service: flink
environment: production
fields_under_root: true
multiline:
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
output.logstash:
hosts: ["logstash:5044"]
Kubernetes (stdout)
In Kubernetes deployments, redirect all Flink logging to stdout so that Kubernetes log collection (Fluentd, Fluent Bit, etc.) picks it up automatically. Use log4j-console.properties or configure a console appender:
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = Console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Mount this file as a ConfigMap and reference it via:
# Flink config (config.yaml)
env.java.opts.all: "-Dlog4j.configurationFile=/opt/flink/conf/log4j-console.properties"
Log levels reference
| Level | Use |
|---|
TRACE | Fine-grained debugging; very verbose |
DEBUG | Detailed diagnostic information |
INFO | Normal operational messages (default) |
WARN | Unexpected situations that do not prevent operation |
ERROR | Errors that require attention |
FATAL | Critical failures that will abort the process |
In production, run at INFO level. Set specific packages to DEBUG only during active debugging, and revert afterwards. Debug-level logging for the Flink internals can generate hundreds of MB per minute on a busy cluster.