When you run a PyFlink job on a cluster, the task managers may not have your Python libraries installed. This page explains how to bundle Python dependencies with your job so every worker has access to them.
Python dependencies
PyFlink provides several mechanisms for distributing Python libraries to task managers.
Using a requirements file
The simplest approach is to list your dependencies in a requirements.txt file and register it with the environment. Flink installs the packages on each task manager before running your UDFs.
from pyflink.table import TableEnvironment, EnvironmentSettings
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# Flink will pip-install these packages on the workers
t_env.set_python_requirements("/path/to/requirements.txt")
You can also point to a local directory containing pre-downloaded wheels, which avoids reaching out to PyPI from the cluster:
t_env.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="/path/to/cached_wheels/",
)
Download the wheels on the submission host with:pip download -r requirements.txt -d ./cached_wheels
Then ship the directory alongside your job.
Adding individual Python files
For single-file helper modules or scripts, use add_python_file():
# DataStream API
env.add_python_file("/local/path/to/my_module.py")
# Table API
t_env.add_python_file("/local/path/to/my_module.py")
The file is distributed to every task manager and added to the Python path of the worker process, so you can import it directly inside your UDFs:
from pyflink.table.udf import udf
from pyflink.table import DataTypes
# my_module.py is available because it was added via add_python_file()
@udf(result_type=DataTypes.STRING())
def process(value):
import my_module # available on every task manager
return my_module.transform(value)
Adding a virtual environment (archive)
For full environment isolation—including packages with native extensions—pack a virtual environment into a zip archive and register it with add_python_archive():
Create and pack the virtual environment
# Create the virtualenv
python -m venv pyflink_venv
source pyflink_venv/bin/activate
pip install numpy pandas scikit-learn
deactivate
# Pack it into a zip archive
zip -r pyflink_venv.zip pyflink_venv/
Register the archive with the job
from pyflink.table import TableEnvironment, EnvironmentSettings
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# The second argument is the directory name the archive will be
# extracted to on the task manager
t_env.add_python_archive("pyflink_venv.zip", "pyflink_venv")
# Tell Flink to use the Python executable inside the archive
t_env.get_config().set(
"python.executable",
"pyflink_venv/pyflink_venv/bin/python",
)
Use the packaged libraries in your UDFs
from pyflink.table.udf import udf
from pyflink.table import DataTypes
@udf(result_type=DataTypes.DOUBLE())
def predict(value):
import numpy as np # comes from the archive
return float(np.sqrt(value))
The virtual environment must be created for the same OS and CPU architecture as the task managers. If your cluster runs Linux x86-64, create the archive on a Linux x86-64 machine.
Using conda-pack
For conda environments, use conda-pack to create a self-contained archive:
conda create -n pyflink_env python=3.10 numpy pandas
conda activate pyflink_env
pip install conda-pack
conda pack -o pyflink_conda_env.tar.gz
Then register the archive:
t_env.add_python_archive("pyflink_conda_env.tar.gz", "pyflink_conda_env")
t_env.get_config().set(
"python.executable",
"pyflink_conda_env/bin/python",
)
JAR dependencies
Python jobs often need JAR files—connectors, formats, or custom serializers. There are two ways to add JARs.
Adding JARs to the environment
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# Add a single JAR
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.20.0.jar")
# Add multiple JARs at once
env.add_jars(
"file:///path/to/flink-sql-connector-kafka-1.20.0.jar",
"file:///path/to/flink-connector-jdbc-3.2.0.jar",
)
The path must be a valid URI. Use file:// for local files and hdfs:// or s3:// for distributed storage.
Adding JARs via the classpath
Alternatively, add JARs to the cluster classpath before submission:
# Place the JAR in Flink's lib directory
cp flink-sql-connector-kafka-1.20.0.jar $FLINK_HOME/lib/
# Then submit normally
flink run --python my_job.py
Table API: registering JARs
For the Table API, you can also call add_jars on TableEnvironment:
from pyflink.table import TableEnvironment, EnvironmentSettings
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set(
"pipeline.jars",
"file:///path/to/flink-sql-connector-kafka-1.20.0.jar",
)
Multiple JARs are separated by semicolons:
t_env.get_config().set(
"pipeline.jars",
"file:///path/to/connector-a.jar;file:///path/to/connector-b.jar",
)
Packaging UDFs for submission
When submitting a job to a remote cluster, all UDF code must be available on the task managers. If your UDFs are defined in separate modules, use a combination of add_python_file and the --pyFiles CLI flag.
# job.py
from pyflink.table import TableEnvironment, EnvironmentSettings
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# Distribute helper modules
t_env.add_python_file("transformations.py")
t_env.add_python_file("utils.py")
flink run \
--python job.py \
--pyFiles transformations.py,utils.py \
--jarfile flink-sql-connector-kafka-1.20.0.jar
Configuration reference
| Configuration key | Description |
|---|
python.executable | Path to the Python interpreter used for UDF workers. |
python.fn-execution.bundle.size | Max records per bundle delivered to the Python worker. |
python.fn-execution.arrow.batch.size | Max rows per Arrow batch for vectorized UDFs. |
pipeline.jars | Semicolon-separated list of JAR URIs added to the job classpath. |
pipeline.classpaths | Additional classpaths loaded by the user code class loader. |
If you set python.executable to a custom interpreter path, make sure that path exists on every task manager, not just the submission host. Use add_python_archive to ship the interpreter when the cluster nodes don’t have it pre-installed.