How do I install PyFlink?
How do I install PyFlink?
Install PyFlink with pip:PyFlink requires Java 11 or 17 and Python 3.9–3.12. Verify your versions before installing:If you need Arrow-based (pandas) UDFs, also install:
Which Python versions are supported?
Which Python versions are supported?
PyFlink supports Python 3.9, 3.10, 3.11, and 3.12. Python 3.8 and earlier are not supported.Python 2 is not supported. Python 3.8 and earlier 3.x versions are not supported.The Python version used by your UDF workers must match the version used to install PyFlink. If you use a virtual environment or
add_python_archive(), configure the interpreter path explicitly:Which Java versions are supported?
Which Java versions are supported?
Flink requires Java 11 or Java 17. Java 8 is no longer supported.Set Check your Java version:
JAVA_HOME if Java is not on your PATH:How do I check the PyFlink version?
How do I check the PyFlink version?
2.0.0). Make sure your connector JARs match this version.How do I wait for jobs to finish when running in mini-cluster mode?
How do I wait for jobs to finish when running in mini-cluster mode?
When you run a Table API job in a Flink mini-cluster (local Python process), call Without For the DataStream API, always call
.wait() after execute_insert():.wait(), the local Python process may exit before the job finishes producing output.On a remote cluster, omit .wait()—the call returns immediately after job submission, and the cluster runs the job asynchronously:env.execute() to trigger execution. In remote mode, it returns after submission.Why does my UDF fail with ModuleNotFoundError on the cluster?
Why does my UDF fail with ModuleNotFoundError on the cluster?
Your local Python environment has the library, but the task managers do not. Use one of these approaches to distribute it:Option 1: requirements fileFlink installs the packages on each task manager before running UDFs.Option 2: pre-built virtual environmentThe virtual environment must be created for the same OS and CPU architecture as the task managers.
How do I use a Kafka connector from PyFlink?
How do I use a Kafka connector from PyFlink?
Download the Kafka SQL connector JAR for your Flink version and add it to your job:Then define a Kafka table:See the Connectors page for more detail.
Can I use pandas and NumPy inside a UDF?
Can I use pandas and NumPy inside a UDF?
Yes. For best performance, use Arrow-based (vectorized) UDFs so that Flink passes entire batches of rows as Requires
pandas.Series objects rather than individual values:pyarrow and pandas to be installed:Why is my Python UDF slow?
Why is my Python UDF slow?
Python UDFs have overhead because data must cross the JVM–Python boundary. Common solutions:
-
Use Arrow-based UDFs (
func_type="pandas") to process batches instead of individual rows. This is the single most impactful optimization. -
Increase bundle size to reduce JVM–Python round trips:
- Move logic to SQL or Java if it can be expressed without Python—Java operators have no serialization overhead.
-
Profile your UDF to find the actual bottleneck:
How do I pass job parameters to a Python UDF?
How do I pass job parameters to a Python UDF?
Use Pass the parameter when submitting the job:Or set it programmatically:
FunctionContext.get_job_parameter() inside your UDF’s open() method:Can I mix the Table API and DataStream API in the same job?
Can I mix the Table API and DataStream API in the same job?
Yes. Use
StreamTableEnvironment instead of TableEnvironment, which provides conversion methods between tables and data streams:How do I enable checkpointing in a PyFlink job?
How do I enable checkpointing in a PyFlink job?
Call For Table API jobs, configure checkpointing through the
enable_checkpointing() on the StreamExecutionEnvironment:StreamExecutionEnvironment and then create a StreamTableEnvironment on top of it.What dependencies does PyFlink require?
What dependencies does PyFlink require?
PyFlink’s Python package depends on:
Optional dependencies for Arrow/pandas UDFs:
These are installed automatically with
| Dependency | Version constraint |
|---|---|
| Py4J | 0.10.9.7 |
| CloudPickle | 2.2.0 |
| python-dateutil | >=2.8.0,<3 |
| Apache Beam | >=2.54.0,<=2.61.0 |
| Dependency | Install with |
|---|---|
| pyarrow | pip install pyarrow |
| pandas | pip install pandas |
pip install apache-flink (except pyarrow and pandas, which are optional).
