Skip to main content
Flink exposes a rich metric system that provides visibility into job behaviour, resource usage, checkpointing, and throughput. Metrics can be exposed to external monitoring systems via pluggable reporters.

Metric types

Flink supports four metric types:

Counter

Monotonically increasing or decreasing integer count. Use for counting records processed, errors encountered, etc.

Gauge

A single value of any type, sampled at report time. Use for current queue depth, watermark lag, state size, etc.

Histogram

Distribution of long values (min, max, mean, percentiles). Use for latency distributions.

Meter

Average rate of events per second. Use for throughput measurements.

Registering custom metrics

Access the metric system from any RichFunction via getRuntimeContext().getMetricGroup().

Counter

public class MyMapper extends RichMapFunction<String, String> {
    private transient Counter counter;

    @Override
    public void open(OpenContext ctx) {
        this.counter = getRuntimeContext()
            .getMetricGroup()
            .counter("recordsProcessed");
    }

    @Override
    public String map(String value) throws Exception {
        this.counter.inc();
        return value;
    }
}

Gauge

public class MyMapper extends RichMapFunction<String, String> {
    private transient int queueDepth = 0;

    @Override
    public void open(OpenContext ctx) {
        getRuntimeContext()
            .getMetricGroup()
            .gauge("queueDepth", () -> queueDepth);
    }

    @Override
    public String map(String value) throws Exception {
        queueDepth++;
        return value;
    }
}

Histogram

Flink does not ship a built-in Histogram implementation. Use the Dropwizard wrapper:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-dropwizard</artifactId>
    <version>${flink.version}</version>
</dependency>
public class MyMapper extends RichMapFunction<Long, Long> {
    private transient Histogram histogram;

    @Override
    public void open(OpenContext ctx) {
        com.codahale.metrics.Histogram dropwizardHistogram =
            new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
        this.histogram = getRuntimeContext()
            .getMetricGroup()
            .histogram("processingLatency",
                       new DropwizardHistogramWrapper(dropwizardHistogram));
    }

    @Override
    public Long map(Long value) throws Exception {
        this.histogram.update(value);
        return value;
    }
}

Meter

public class MyMapper extends RichMapFunction<Long, Long> {
    private transient Meter meter;

    @Override
    public void open(OpenContext ctx) {
        com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
        this.meter = getRuntimeContext()
            .getMetricGroup()
            .meter("recordsPerSecond",
                   new DropwizardMeterWrapper(dropwizardMeter));
    }

    @Override
    public Long map(Long value) throws Exception {
        this.meter.markEvent();
        return value;
    }
}

Metric scopes and naming

Every metric has an identifier composed of a system scope, an optional user scope, and a name. The default delimiter is ..

System scope formats

LevelDefault scope format
JobManager<host>.jobmanager
JobManager + Job<host>.jobmanager.<job_name>
TaskManager<host>.taskmanager.<tm_id>
TaskManager + Job<host>.taskmanager.<tm_id>.<job_name>
Task<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
Operator<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
An operator metric named myCounter on localhost with TM 1234, job MyJob, operator MyOperator, subtask 0 would produce:
localhost.taskmanager.1234.MyJob.MyOperator.0.myCounter
Customise scope formats in config.yaml:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

User-defined scope groups

counter = getRuntimeContext()
    .getMetricGroup()
    .addGroup("MySubsystem")
    .counter("myCounter");
This adds MySubsystem as an infix between the system scope and the metric name.

Key system metrics

Throughput metrics (Operator scope)

MetricTypeDescription
numRecordsInCounterTotal records received by this operator
numRecordsOutCounterTotal records emitted by this operator
numRecordsInPerSecondMeterInput records per second
numRecordsOutPerSecondMeterOutput records per second
numBytesInCounterTotal bytes received
numBytesOutCounterTotal bytes emitted

Checkpointing metrics (Job scope)

MetricTypeDescription
lastCheckpointDurationGaugeDuration of the last completed checkpoint (ms)
lastCheckpointSizeGaugeSize of the last completed checkpoint (bytes)
lastCheckpointRestoreTimestampGaugeTimestamp of the last restore from a checkpoint
numberOfCompletedCheckpointsCounterTotal completed checkpoints
numberOfFailedCheckpointsCounterTotal failed checkpoints
numberOfInProgressCheckpointsGaugeCurrently in-progress checkpoints

JVM metrics (Job-/TaskManager scope)

MetricInfixTypeDescription
LoadStatus.JVM.CPUGaugeRecent JVM CPU usage
Heap.UsedStatus.JVM.MemoryGaugeJVM heap used (bytes)
Heap.MaxStatus.JVM.MemoryGaugeJVM heap max (bytes)
NonHeap.UsedStatus.JVM.MemoryGaugeNon-heap used (bytes)
CountStatus.JVM.GarbageCollectorGaugeTotal GC collections
TimeStatus.JVM.GarbageCollectorGaugeTotal time spent in GC
CountStatus.JVM.ThreadsGaugeTotal live threads

Back-pressure metrics (Task scope)

MetricTypeDescription
isBackPressuredGaugeWhether the task is back-pressured
buffers.inputQueueLengthGaugeNumber of queued input buffers
buffers.outputQueueLengthGaugeNumber of queued output buffers
buffers.inPoolUsageGaugeInput buffer pool usage (0.0–1.0)
buffers.outPoolUsageGaugeOutput buffer pool usage (0.0–1.0)

Metric reporters

Flink ships with reporters for several monitoring backends. Configure a reporter in config.yaml:

Prometheus

metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
Flink exposes metrics at http://<host>:9249/metrics in Prometheus text format. Scrape this endpoint from your Prometheus server:
# prometheus.yml
scrape_configs:
  - job_name: flink
    static_configs:
      - targets: ['taskmanager1:9249', 'taskmanager2:9249', 'jobmanager:9249']

JMX

metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789

Graphite

metrics.reporter.graphite.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.graphite.host: graphite-host
metrics.reporter.graphite.port: 2003
metrics.reporter.graphite.protocol: TCP

StatsD

metrics.reporter.statsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
metrics.reporter.statsd.host: statsd-host
metrics.reporter.statsd.port: 8125
You can configure multiple reporters simultaneously by giving each a unique name prefix:
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789

Custom metric variables

You can attach custom variables (tags/labels) to all metrics reported by a given source or operator:
DataStream<Event> stream = env
    .fromSource(kafkaSource, watermarkStrategy, "KafkaSource")
    .addMetricVariable("pipeline", "checkout");
When the reporter supports labels (e.g., Prometheus), pipeline="checkout" is attached to all metrics from that source. This allows you to filter and aggregate metrics in Grafana without changing metric names.

Build docs developers (and LLMs) love