Skip to main content
When you write a Python UDF in PyFlink, Flink must decide how to run that Python code relative to the JVM-based task managers. There are two execution modes, each with different performance characteristics and resource requirements.

How UDF execution works

Every Flink task manager runs on the JVM. When a task manager encounters a Python operator, it spawns a Python worker process (or communicates with one in the same JVM process) and exchanges data with it over a local socket or shared memory. The Python worker deserializes records, invokes your UDF, and returns results to the JVM. The configuration key that controls this behavior is:
python.execution-mode

Execution modes

In process mode (the default), each task manager spawns a separate Python worker process. The JVM and Python processes communicate over a local socket using Flink’s internal protocol.Advantages:
  • Full Python isolation—crashes in the Python worker don’t affect the JVM task manager.
  • You can use any Python library, including those with C extensions.
  • Compatible with all Python versions supported by PyFlink.
Disadvantages:
  • Data must be serialized and sent over a socket, adding latency and CPU overhead for high-throughput workloads.
  • Each task slot may spawn its own Python process, increasing memory usage.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Configuration

config = Configuration()
config.set_string("python.execution-mode", "process")

env = StreamExecutionEnvironment.get_execution_environment(config)

Configuring execution mode

You can set the execution mode in three ways:
from pyflink.common import Configuration
from pyflink.table import TableEnvironment, EnvironmentSettings

config = Configuration()
config.set_string("python.execution-mode", "process")

settings = EnvironmentSettings.new_instance() \
    .in_streaming_mode() \
    .with_configuration(config) \
    .build()

t_env = TableEnvironment.create(settings)

Arrow-based (pandas) UDFs

For compute-intensive UDFs that process batches of rows, PyFlink supports Arrow-based UDFs (also called vectorized or pandas UDFs). Instead of receiving one row at a time, your UDF receives a pandas.Series or pandas.DataFrame containing a batch of values. This dramatically reduces the per-row overhead of Python–JVM data exchange.
Arrow-based UDFs require pyarrow and pandas to be installed in the Python environment.
pip install pyarrow pandas

Vectorized scalar UDF

Decorate your function with udf and set func_type="pandas":
import pandas as pd
from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def multiply(series: pd.Series) -> pd.Series:
    """Double every value in the column."""
    return series * 2.0
Use it in a Table API query exactly like a regular UDF:
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.expressions import col
import pandas as pd
from pyflink.table.udf import udf

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def multiply(s: pd.Series) -> pd.Series:
    return s * 2.0

table = t_env.from_elements([(1.0,), (2.0,), (3.0,)], schema=["value"])
table.select(multiply(col("value")).alias("doubled")).execute().print()

Vectorized aggregate UDF

For grouped aggregations, use udaf with func_type="pandas":
import pandas as pd
from pyflink.table import DataTypes
from pyflink.table.udf import udaf

@udaf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def pandas_mean(series: pd.Series) -> float:
    return series.mean()
from pyflink.table.expressions import col

# Apply over a grouped table
table.group_by(col("category")) \
     .aggregate(pandas_mean(col("value")).alias("avg_value")) \
     .select(col("category"), col("avg_value")) \
     .execute().print()

Controlling batch size

Arrow-based UDFs process records in micro-batches. You can tune the maximum number of rows per batch:
t_env.get_config().set("python.fn-execution.arrow.batch.size", "1000")
Larger batches increase throughput but also increase memory usage and latency. The default is 10000.

Performance comparison

Characteristicprocess mode (row-based)process mode (Arrow)thread mode
IsolationHighHighLow
Serialization overheadHigh (per row)Low (per batch)Lowest
Memory per slotHigh (extra process)High (extra process)Low
GIL impactNoneNoneYes
C extension compatibilityFullFullLimited
For most production workloads with Python UDFs, Arrow-based UDFs in process mode offer the best balance of safety and throughput.

Additional Python execution settings

Configuration keyDefaultDescription
python.fn-execution.bundle.size100000Max records per bundle sent to the Python worker.
python.fn-execution.bundle.time1000Max time (ms) to wait before flushing a bundle.
python.fn-execution.arrow.batch.size10000Max rows per Arrow batch (vectorized UDFs only).
python.metric.enabledfalseEnable metric reporting from Python UDFs.
python.profile.enabledfalseEnable cProfile-based profiling of Python workers.
t_env.get_config().set("python.fn-execution.bundle.size", "50000")
t_env.get_config().set("python.fn-execution.bundle.time", "500")
If your Python UDFs are I/O-bound (network calls, database lookups), consider using AsyncScalarFunction in the Table API, which lets your UDF return a coroutine and overlaps I/O wait with other work.

Build docs developers (and LLMs) love