PyFlink exposes Flink’s metrics system to Python UDFs through the FunctionContext object. You can register counters, gauges, meters, and distributions that integrate with Flink’s metrics reporters (JMX, Prometheus, InfluxDB, etc.).
Enabling metrics
Metric collection from Python UDFs is disabled by default. Enable it with:
t_env.get_config().set("python.metric.enabled", "true")
Or in conf/config.yaml:
python.metric.enabled: true
Accessing context.get_metric_group() when python.metric.enabled is false raises a RuntimeError. Always enable the setting before registering metrics.
Accessing the MetricGroup
Metrics are registered through the FunctionContext passed to the open() lifecycle method of your UDF. Store the MetricGroup as an instance variable so your metric objects are accessible in eval():
from pyflink.table.udf import ScalarFunction, udf, FunctionContext
from pyflink.table import DataTypes
class MyUDF(ScalarFunction):
def __init__(self):
self.counter = None
def open(self, context: FunctionContext):
# Get the metric group for this UDF instance
metric_group = context.get_metric_group()
self.counter = metric_group.counter("records_processed")
def eval(self, value):
self.counter.inc()
return value.upper()
Metric types
Counter
A Counter tracks a monotonically increasing (or decreasing) integer value. Use it to count events, errors, or records.
from pyflink.table.udf import ScalarFunction, FunctionContext
from pyflink.table import DataTypes
class WordCounter(ScalarFunction):
def open(self, context: FunctionContext):
self.words_seen = context.get_metric_group().counter("words_seen")
self.errors = context.get_metric_group().counter("parse_errors")
def eval(self, text):
try:
words = text.split()
# inc() increments by 1; inc(n) increments by n
self.words_seen.inc(len(words))
return len(words)
except Exception:
self.errors.inc()
return 0
Counter operations:
| Method | Description |
|---|
counter.inc() | Increment by 1. |
counter.inc(n) | Increment by n. |
counter.dec() | Decrement by 1. |
counter.dec(n) | Decrement by n. |
counter.get_count() | Return current count. |
Gauge
A Gauge reports a value at query time by calling a callable you provide. Use it for values that change independently of record processing—cache sizes, queue depths, etc.
from pyflink.table.udf import ScalarFunction, FunctionContext
class CachingUDF(ScalarFunction):
def __init__(self):
self._cache = {}
def open(self, context: FunctionContext):
# The gauge calls this lambda whenever metrics are collected
context.get_metric_group().gauge(
"cache_size",
lambda: len(self._cache),
)
def eval(self, key):
if key not in self._cache:
self._cache[key] = self._expensive_lookup(key)
return self._cache[key]
def _expensive_lookup(self, key):
return f"value_for_{key}"
The callable must return an int. If you need to report a float, convert it:
context.get_metric_group().gauge(
"hit_rate_pct",
lambda: int(self._hits / max(self._total, 1) * 100),
)
Meter
A Meter measures the rate at which events occur (events per second). It maintains a moving average over a configurable time window.
from pyflink.table.udf import ScalarFunction, FunctionContext
class ThroughputTracker(ScalarFunction):
def open(self, context: FunctionContext):
# Default time span is 60 seconds
self.throughput = context.get_metric_group().meter(
"records_per_second",
time_span_in_seconds=60,
)
def eval(self, value):
# Mark that one event occurred
self.throughput.mark_event()
return value * 2
Meter operations:
| Method | Description |
|---|
meter.mark_event() | Record one event. |
meter.mark_event(n) | Record n events. |
meter.get_count() | Total events recorded since creation. |
Distribution (Histogram)
A Distribution collects statistics about a value’s distribution—min, max, mean, standard deviation, and percentiles. Use it to track latencies or value ranges.
from pyflink.table.udf import ScalarFunction, FunctionContext
class LatencyTracker(ScalarFunction):
def open(self, context: FunctionContext):
self.latency = context.get_metric_group().distribution("processing_latency_ms")
def eval(self, value):
import time
start = time.monotonic()
result = self._process(value)
latency_ms = int((time.monotonic() - start) * 1000)
self.latency.update(latency_ms)
return result
def _process(self, value):
return value.strip().lower()
Distribution operations:
| Method | Description |
|---|
dist.update(value) | Record a new observation. |
Organizing metrics with subgroups
Use add_group() to create a nested metric hierarchy. This maps to scope labels in your metrics reporter.
from pyflink.table.udf import ScalarFunction, FunctionContext
class EnrichedUDF(ScalarFunction):
def open(self, context: FunctionContext):
root = context.get_metric_group()
# Create a named subgroup
io_group = root.add_group("io")
self.reads = io_group.counter("reads")
self.writes = io_group.counter("writes")
# Create a key-value subgroup (adds a user variable to the metric scope)
tagged_group = root.add_group("region", "us-east-1")
self.regional_counter = tagged_group.counter("requests")
def eval(self, value):
self.reads.inc()
result = self._enrich(value)
self.writes.inc()
return result
def _enrich(self, value):
return f"enriched:{value}"
The metric name io.reads will appear as <job_scope>.io.reads in your reporter, and region.us-east-1.requests will include us-east-1 as a scope tag.
Complete example: UDF with multiple metrics
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.expressions import col
from pyflink.table.udf import ScalarFunction, udf, FunctionContext
class MetricsDemoUDF(ScalarFunction):
"""A UDF that demonstrates all four metric types."""
def open(self, context: FunctionContext):
g = context.get_metric_group().add_group("demo")
self.invocations = g.counter("invocations")
self.errors = g.counter("errors")
self.throughput = g.meter("throughput")
self.value_dist = g.distribution("input_length")
self._total = 0
g.gauge("running_total", lambda: self._total)
def eval(self, text: str) -> str:
try:
self.invocations.inc()
self.throughput.mark_event()
self.value_dist.update(len(text))
result = text.strip().lower()
self._total += len(result)
return result
except Exception as e:
self.errors.inc()
raise
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("python.metric.enabled", "true")
my_udf = udf(MetricsDemoUDF(), result_type=DataTypes.STRING())
table = t_env.from_elements(
[(" Hello World ",), (" Flink ",), (" PyFlink ",)],
schema=["text"],
)
table.select(my_udf(col("text")).alias("cleaned")).execute().print()
Viewing metrics
Metrics registered from Python UDFs appear in the same places as Java metrics:
- Flink Web UI — click a task and open the “Metrics” tab.
- Prometheus — configure the Prometheus reporter in
conf/config.yaml.
- JMX — enabled by default; use
jconsole or any JMX client.
# conf/config.yaml — enable Prometheus reporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249