How UDF execution works
Every Flink task manager runs on the JVM. When a task manager encounters a Python operator, it spawns a Python worker process (or communicates with one in the same JVM process) and exchanges data with it over a local socket or shared memory. The Python worker deserializes records, invokes your UDF, and returns results to the JVM. The configuration key that controls this behavior is:Execution modes
- process (default)
- thread
In
process mode (the default), each task manager spawns a separate Python worker process. The JVM and Python processes communicate over a local socket using Flink’s internal protocol.Advantages:- Full Python isolation—crashes in the Python worker don’t affect the JVM task manager.
- You can use any Python library, including those with C extensions.
- Compatible with all Python versions supported by PyFlink.
- Data must be serialized and sent over a socket, adding latency and CPU overhead for high-throughput workloads.
- Each task slot may spawn its own Python process, increasing memory usage.
Configuring execution mode
You can set the execution mode in three ways:- Via Configuration object
- Via TableConfig
- Via config.yaml
Arrow-based (pandas) UDFs
For compute-intensive UDFs that process batches of rows, PyFlink supports Arrow-based UDFs (also called vectorized or pandas UDFs). Instead of receiving one row at a time, your UDF receives apandas.Series or pandas.DataFrame containing a batch of values. This dramatically reduces the per-row overhead of Python–JVM data exchange.
Arrow-based UDFs require
pyarrow and pandas to be installed in the Python environment.Vectorized scalar UDF
Decorate your function withudf and set func_type="pandas":
Vectorized aggregate UDF
For grouped aggregations, useudaf with func_type="pandas":
Controlling batch size
Arrow-based UDFs process records in micro-batches. You can tune the maximum number of rows per batch:10000.
Performance comparison
| Characteristic | process mode (row-based) | process mode (Arrow) | thread mode |
|---|---|---|---|
| Isolation | High | High | Low |
| Serialization overhead | High (per row) | Low (per batch) | Lowest |
| Memory per slot | High (extra process) | High (extra process) | Low |
| GIL impact | None | None | Yes |
| C extension compatibility | Full | Full | Limited |
process mode offer the best balance of safety and throughput.
Additional Python execution settings
| Configuration key | Default | Description |
|---|---|---|
python.fn-execution.bundle.size | 100000 | Max records per bundle sent to the Python worker. |
python.fn-execution.bundle.time | 1000 | Max time (ms) to wait before flushing a bundle. |
python.fn-execution.arrow.batch.size | 10000 | Max rows per Arrow batch (vectorized UDFs only). |
python.metric.enabled | false | Enable metric reporting from Python UDFs. |
python.profile.enabled | false | Enable cProfile-based profiling of Python workers. |

