Skip to main content
PyFlink exposes Flink’s metrics system to Python UDFs through the FunctionContext object. You can register counters, gauges, meters, and distributions that integrate with Flink’s metrics reporters (JMX, Prometheus, InfluxDB, etc.).

Enabling metrics

Metric collection from Python UDFs is disabled by default. Enable it with:
t_env.get_config().set("python.metric.enabled", "true")
Or in conf/config.yaml:
python.metric.enabled: true
Accessing context.get_metric_group() when python.metric.enabled is false raises a RuntimeError. Always enable the setting before registering metrics.

Accessing the MetricGroup

Metrics are registered through the FunctionContext passed to the open() lifecycle method of your UDF. Store the MetricGroup as an instance variable so your metric objects are accessible in eval():
from pyflink.table.udf import ScalarFunction, udf, FunctionContext
from pyflink.table import DataTypes

class MyUDF(ScalarFunction):
    def __init__(self):
        self.counter = None

    def open(self, context: FunctionContext):
        # Get the metric group for this UDF instance
        metric_group = context.get_metric_group()
        self.counter = metric_group.counter("records_processed")

    def eval(self, value):
        self.counter.inc()
        return value.upper()

Metric types

Counter

A Counter tracks a monotonically increasing (or decreasing) integer value. Use it to count events, errors, or records.
from pyflink.table.udf import ScalarFunction, FunctionContext
from pyflink.table import DataTypes

class WordCounter(ScalarFunction):
    def open(self, context: FunctionContext):
        self.words_seen = context.get_metric_group().counter("words_seen")
        self.errors = context.get_metric_group().counter("parse_errors")

    def eval(self, text):
        try:
            words = text.split()
            # inc() increments by 1; inc(n) increments by n
            self.words_seen.inc(len(words))
            return len(words)
        except Exception:
            self.errors.inc()
            return 0
Counter operations:
MethodDescription
counter.inc()Increment by 1.
counter.inc(n)Increment by n.
counter.dec()Decrement by 1.
counter.dec(n)Decrement by n.
counter.get_count()Return current count.

Gauge

A Gauge reports a value at query time by calling a callable you provide. Use it for values that change independently of record processing—cache sizes, queue depths, etc.
from pyflink.table.udf import ScalarFunction, FunctionContext

class CachingUDF(ScalarFunction):
    def __init__(self):
        self._cache = {}

    def open(self, context: FunctionContext):
        # The gauge calls this lambda whenever metrics are collected
        context.get_metric_group().gauge(
            "cache_size",
            lambda: len(self._cache),
        )

    def eval(self, key):
        if key not in self._cache:
            self._cache[key] = self._expensive_lookup(key)
        return self._cache[key]

    def _expensive_lookup(self, key):
        return f"value_for_{key}"
The callable must return an int. If you need to report a float, convert it:
context.get_metric_group().gauge(
    "hit_rate_pct",
    lambda: int(self._hits / max(self._total, 1) * 100),
)

Meter

A Meter measures the rate at which events occur (events per second). It maintains a moving average over a configurable time window.
from pyflink.table.udf import ScalarFunction, FunctionContext

class ThroughputTracker(ScalarFunction):
    def open(self, context: FunctionContext):
        # Default time span is 60 seconds
        self.throughput = context.get_metric_group().meter(
            "records_per_second",
            time_span_in_seconds=60,
        )

    def eval(self, value):
        # Mark that one event occurred
        self.throughput.mark_event()
        return value * 2
Meter operations:
MethodDescription
meter.mark_event()Record one event.
meter.mark_event(n)Record n events.
meter.get_count()Total events recorded since creation.

Distribution (Histogram)

A Distribution collects statistics about a value’s distribution—min, max, mean, standard deviation, and percentiles. Use it to track latencies or value ranges.
from pyflink.table.udf import ScalarFunction, FunctionContext

class LatencyTracker(ScalarFunction):
    def open(self, context: FunctionContext):
        self.latency = context.get_metric_group().distribution("processing_latency_ms")

    def eval(self, value):
        import time
        start = time.monotonic()

        result = self._process(value)

        latency_ms = int((time.monotonic() - start) * 1000)
        self.latency.update(latency_ms)
        return result

    def _process(self, value):
        return value.strip().lower()
Distribution operations:
MethodDescription
dist.update(value)Record a new observation.

Organizing metrics with subgroups

Use add_group() to create a nested metric hierarchy. This maps to scope labels in your metrics reporter.
from pyflink.table.udf import ScalarFunction, FunctionContext

class EnrichedUDF(ScalarFunction):
    def open(self, context: FunctionContext):
        root = context.get_metric_group()

        # Create a named subgroup
        io_group = root.add_group("io")
        self.reads = io_group.counter("reads")
        self.writes = io_group.counter("writes")

        # Create a key-value subgroup (adds a user variable to the metric scope)
        tagged_group = root.add_group("region", "us-east-1")
        self.regional_counter = tagged_group.counter("requests")

    def eval(self, value):
        self.reads.inc()
        result = self._enrich(value)
        self.writes.inc()
        return result

    def _enrich(self, value):
        return f"enriched:{value}"
The metric name io.reads will appear as <job_scope>.io.reads in your reporter, and region.us-east-1.requests will include us-east-1 as a scope tag.

Complete example: UDF with multiple metrics

from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.expressions import col
from pyflink.table.udf import ScalarFunction, udf, FunctionContext


class MetricsDemoUDF(ScalarFunction):
    """A UDF that demonstrates all four metric types."""

    def open(self, context: FunctionContext):
        g = context.get_metric_group().add_group("demo")
        self.invocations = g.counter("invocations")
        self.errors = g.counter("errors")
        self.throughput = g.meter("throughput")
        self.value_dist = g.distribution("input_length")
        self._total = 0
        g.gauge("running_total", lambda: self._total)

    def eval(self, text: str) -> str:
        try:
            self.invocations.inc()
            self.throughput.mark_event()
            self.value_dist.update(len(text))
            result = text.strip().lower()
            self._total += len(result)
            return result
        except Exception as e:
            self.errors.inc()
            raise


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("python.metric.enabled", "true")

my_udf = udf(MetricsDemoUDF(), result_type=DataTypes.STRING())

table = t_env.from_elements(
    [("  Hello World  ",), ("  Flink  ",), ("  PyFlink  ",)],
    schema=["text"],
)
table.select(my_udf(col("text")).alias("cleaned")).execute().print()

Viewing metrics

Metrics registered from Python UDFs appear in the same places as Java metrics:
  • Flink Web UI — click a task and open the “Metrics” tab.
  • Prometheus — configure the Prometheus reporter in conf/config.yaml.
  • JMX — enabled by default; use jconsole or any JMX client.
# conf/config.yaml — enable Prometheus reporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249

Build docs developers (and LLMs) love