Metric types
Flink supports four metric types:Counter
Monotonically increasing or decreasing integer count. Use for counting records processed, errors encountered, etc.
Gauge
A single value of any type, sampled at report time. Use for current queue depth, watermark lag, state size, etc.
Histogram
Distribution of long values (min, max, mean, percentiles). Use for latency distributions.
Meter
Average rate of events per second. Use for throughput measurements.
Registering custom metrics
Access the metric system from anyRichFunction via getRuntimeContext().getMetricGroup().
Counter
Gauge
Histogram
Flink does not ship a built-inHistogram implementation. Use the Dropwizard wrapper:
Meter
Metric scopes and naming
Every metric has an identifier composed of a system scope, an optional user scope, and a name. The default delimiter is..
System scope formats
| Level | Default scope format |
|---|---|
| JobManager | <host>.jobmanager |
| JobManager + Job | <host>.jobmanager.<job_name> |
| TaskManager | <host>.taskmanager.<tm_id> |
| TaskManager + Job | <host>.taskmanager.<tm_id>.<job_name> |
| Task | <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index> |
| Operator | <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index> |
myCounter on localhost with TM 1234, job MyJob, operator MyOperator, subtask 0 would produce:
config.yaml:
User-defined scope groups
MySubsystem as an infix between the system scope and the metric name.
Key system metrics
Throughput metrics (Operator scope)
| Metric | Type | Description |
|---|---|---|
numRecordsIn | Counter | Total records received by this operator |
numRecordsOut | Counter | Total records emitted by this operator |
numRecordsInPerSecond | Meter | Input records per second |
numRecordsOutPerSecond | Meter | Output records per second |
numBytesIn | Counter | Total bytes received |
numBytesOut | Counter | Total bytes emitted |
Checkpointing metrics (Job scope)
| Metric | Type | Description |
|---|---|---|
lastCheckpointDuration | Gauge | Duration of the last completed checkpoint (ms) |
lastCheckpointSize | Gauge | Size of the last completed checkpoint (bytes) |
lastCheckpointRestoreTimestamp | Gauge | Timestamp of the last restore from a checkpoint |
numberOfCompletedCheckpoints | Counter | Total completed checkpoints |
numberOfFailedCheckpoints | Counter | Total failed checkpoints |
numberOfInProgressCheckpoints | Gauge | Currently in-progress checkpoints |
JVM metrics (Job-/TaskManager scope)
| Metric | Infix | Type | Description |
|---|---|---|---|
Load | Status.JVM.CPU | Gauge | Recent JVM CPU usage |
Heap.Used | Status.JVM.Memory | Gauge | JVM heap used (bytes) |
Heap.Max | Status.JVM.Memory | Gauge | JVM heap max (bytes) |
NonHeap.Used | Status.JVM.Memory | Gauge | Non-heap used (bytes) |
Count | Status.JVM.GarbageCollector | Gauge | Total GC collections |
Time | Status.JVM.GarbageCollector | Gauge | Total time spent in GC |
Count | Status.JVM.Threads | Gauge | Total live threads |
Back-pressure metrics (Task scope)
| Metric | Type | Description |
|---|---|---|
isBackPressured | Gauge | Whether the task is back-pressured |
buffers.inputQueueLength | Gauge | Number of queued input buffers |
buffers.outputQueueLength | Gauge | Number of queued output buffers |
buffers.inPoolUsage | Gauge | Input buffer pool usage (0.0–1.0) |
buffers.outPoolUsage | Gauge | Output buffer pool usage (0.0–1.0) |
Metric reporters
Flink ships with reporters for several monitoring backends. Configure a reporter inconfig.yaml:
Prometheus
http://<host>:9249/metrics in Prometheus text format. Scrape this endpoint from your Prometheus server:
JMX
Graphite
StatsD
Custom metric variables
You can attach custom variables (tags/labels) to all metrics reported by a given source or operator:pipeline="checkout" is attached to all metrics from that source. This allows you to filter and aggregate metrics in Grafana without changing metric names.
