Skip to main content
PyFlink supports Python 3.9, 3.10, 3.11, and 3.12. Python 3.8 and earlier are not supported.Python 2 is not supported. Python 3.8 and earlier 3.x versions are not supported.The Python version used by your UDF workers must match the version used to install PyFlink. If you use a virtual environment or add_python_archive(), configure the interpreter path explicitly:
t_env.get_config().set("python.executable", "/path/to/python3.10")
Flink requires Java 11 or Java 17. Java 8 is no longer supported.Set JAVA_HOME if Java is not on your PATH:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
Check your Java version:
java -version
# Expected: openjdk version "17.x.x" ...
When you run a Table API job in a Flink mini-cluster (local Python process), call .wait() after execute_insert():
# Wait for the job to complete (required in mini-cluster mode)
table.execute_insert("my_sink").wait()
Without .wait(), the local Python process may exit before the job finishes producing output.On a remote cluster, omit .wait()—the call returns immediately after job submission, and the cluster runs the job asynchronously:
# On a remote cluster, do NOT call .wait()
table.execute_insert("my_sink")
For the DataStream API, always call env.execute() to trigger execution. In remote mode, it returns after submission.
Your local Python environment has the library, but the task managers do not. Use one of these approaches to distribute it:Option 1: requirements file
t_env.set_python_requirements("/path/to/requirements.txt")
Flink installs the packages on each task manager before running UDFs.Option 2: pre-built virtual environment
python -m venv pyflink_venv
source pyflink_venv/bin/activate
pip install numpy pandas scikit-learn
deactivate
zip -r pyflink_venv.zip pyflink_venv/
t_env.add_python_archive("pyflink_venv.zip", "pyflink_venv")
t_env.get_config().set(
    "python.executable",
    "pyflink_venv/pyflink_venv/bin/python",
)
The virtual environment must be created for the same OS and CPU architecture as the task managers.
Yes. For best performance, use Arrow-based (vectorized) UDFs so that Flink passes entire batches of rows as pandas.Series objects rather than individual values:
import pandas as pd
import numpy as np
from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def normalize(s: pd.Series) -> pd.Series:
    return (s - s.mean()) / s.std()
Requires pyarrow and pandas to be installed:
pip install pyarrow pandas
Python UDFs have overhead because data must cross the JVM–Python boundary. Common solutions:
  1. Use Arrow-based UDFs (func_type="pandas") to process batches instead of individual rows. This is the single most impactful optimization.
    @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
    def fast_udf(s: pd.Series) -> pd.Series:
        return s * 2.0
    
  2. Increase bundle size to reduce JVM–Python round trips:
    t_env.get_config().set("python.fn-execution.bundle.size", "100000")
    
  3. Move logic to SQL or Java if it can be expressed without Python—Java operators have no serialization overhead.
  4. Profile your UDF to find the actual bottleneck:
    t_env.get_config().set("python.profile.enabled", "true")
    
Use FunctionContext.get_job_parameter() inside your UDF’s open() method:
from pyflink.table.udf import ScalarFunction, FunctionContext

class ConfigurableUDF(ScalarFunction):
    def open(self, context: FunctionContext):
        self.multiplier = float(
            context.get_job_parameter("multiplier", "1.0")
        )

    def eval(self, value):
        return value * self.multiplier
Pass the parameter when submitting the job:
flink run --python job.py -D multiplier=2.5
Or set it programmatically:
t_env.get_config().set("pipeline.global-job-parameters", "multiplier=2.5")
Yes. Use StreamTableEnvironment instead of TableEnvironment, which provides conversion methods between tables and data streams:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Table → DataStream
table = t_env.from_path("my_source")
ds = t_env.to_data_stream(table)

# DataStream operations
ds = ds.filter(lambda r: r[0] > 0)

# DataStream → Table
from pyflink.table import Schema, DataTypes
result_table = t_env.from_data_stream(
    ds,
    Schema.new_builder().column("value", DataTypes.BIGINT()).build(),
)
result_table.execute_insert("my_sink").wait()

Build docs developers (and LLMs) love