Skip to main content
Debugging distributed Python code requires different techniques than debugging a standalone script. This page covers the tools and approaches for finding and fixing issues in PyFlink jobs.

Debugging locally

The easiest way to debug a PyFlink job is to run it locally, where Flink starts an embedded mini-cluster in the same process.

Running with Python directly

python my_job.py
Flink’s mini-cluster mode runs everything in a single JVM process (plus Python worker processes). Exceptions from Python UDFs propagate back to the driver with a full traceback.

Debugging UDF logic in isolation

Before running your UDF inside Flink, test it as a plain Python function:
# test_udf.py — no Flink required
def my_transform(value: str) -> str:
    """The same logic as your UDF."""
    return value.strip().lower()

# Unit test
assert my_transform("  Hello  ") == "hello"
assert my_transform("FLINK") == "flink"
print("All assertions passed")
This is the fastest feedback loop. Only bring Flink in once the logic is correct. The environment variable PYFLINK_GATEWAY_DISABLED prevents PyFlink from starting the JVM gateway. This is useful when you want to step through code that imports pyflink modules without launching a full Flink environment:
export PYFLINK_GATEWAY_DISABLED=true
python -m pdb my_job.py
With PYFLINK_GATEWAY_DISABLED=true, any code that actually calls into the JVM (e.g., creating a StreamExecutionEnvironment) will raise an error. This mode is only useful for inspecting Python-side logic that doesn’t require the JVM.

Attaching a debugger to the Python UDF worker

To step through UDF code while it runs inside Flink, use debugpy (the VS Code debug adapter):
1

Install debugpy

pip install debugpy
2

Add a breakpoint in your UDF

from pyflink.table.udf import ScalarFunction
from pyflink.table import DataTypes

class DebuggableUDF(ScalarFunction):
    def eval(self, value):
        import debugpy
        # Wait for the debugger to attach on port 5678
        debugpy.listen(("0.0.0.0", 5678))
        debugpy.wait_for_client()
        debugpy.breakpoint()

        return value.upper()
3

Attach from VS Code or PyCharm

In VS Code, add a launch configuration:
{
    "type": "debugpy",
    "request": "attach",
    "name": "Attach to PyFlink UDF",
    "connect": {
        "host": "localhost",
        "port": 5678
    }
}
Run your Flink job, then attach the debugger from the VS Code Run panel.

Logging

Logging from Python UDFs

Use the standard logging module inside UDFs. The output goes to the task manager’s log file.
import logging
from pyflink.table.udf import ScalarFunction

logger = logging.getLogger(__name__)

class LoggingUDF(ScalarFunction):
    def eval(self, value):
        logger.info("Processing value: %s", value)
        try:
            result = self._transform(value)
            logger.debug("Result: %s", result)
            return result
        except Exception as e:
            logger.error("Failed to process value '%s': %s", value, e, exc_info=True)
            raise

    def _transform(self, value):
        return value.strip().lower()

Configuring log level

Set the log level in log4j2.properties (for Java-side logs) and via the Python logging module (for Python-side logs):
import logging
import sys

logging.basicConfig(
    stream=sys.stdout,
    level=logging.DEBUG,
    format="%(asctime)s %(name)s %(levelname)s %(message)s",
)

Viewing task manager logs

On a running cluster, access logs through the Flink Web UI:
  1. Open the Flink Web UI (default: http://localhost:8081).
  2. Click Task Managers in the left sidebar.
  3. Select a task manager and click the Logs tab.
Python UDF logs appear alongside Java task manager logs.

Profiling

Enabling cProfile

Flink can profile Python UDF workers using Python’s built-in cProfile module. Enable it with:
t_env.get_config().set("python.profile.enabled", "true")
Profile output is written to the task manager’s temporary directory. The filename is logged at the INFO level when the Python worker exits.

Reading profile output

import pstats
import io

# Load the .prof file written by the task manager
stats = pstats.Stats("/tmp/pyflink_profile_output.prof")
stats.sort_stats("cumulative")
stats.print_stats(20)  # Show top 20 functions by cumulative time

Flame graphs

For a visual view, convert the profile to a flame graph using flameprof:
pip install flameprof
python -m flameprof /tmp/pyflink_profile_output.prof > flame.svg
open flame.svg

Common errors and solutions

Symptom: Your UDF raises ModuleNotFoundError on the cluster even though the library is installed locally.Cause: The Python environment on the task managers does not have the package installed.Solution: Use set_python_requirements() or add_python_archive() to ship the dependency:
# Option 1: requirements file
t_env.set_python_requirements("requirements.txt")

# Option 2: pre-built virtual environment
t_env.add_python_archive("pyflink_venv.zip", "pyflink_venv")
t_env.get_config().set(
    "python.executable",
    "pyflink_venv/pyflink_venv/bin/python",
)
Symptom: The job fails with a Java exception stating the Python process exited.Cause: An unhandled exception in Python caused the worker to crash, or the Python process ran out of memory.Solution:
  • Check the task manager logs for the Python traceback that preceded the Java exception.
  • Add try/except in your UDF to catch and log exceptions rather than letting them propagate as crashes.
  • Increase the Python worker memory if the crash is OOM-related.
def eval(self, value):
    try:
        return self._process(value)
    except Exception as e:
        import logging
        logging.getLogger(__name__).error("UDF error: %s", e, exc_info=True)
        return None  # or re-raise
Symptom: py4j.protocol.Py4JError or errors about the JVM gateway failing to start.Cause: Java is not installed, JAVA_HOME is not set, or the Java version is incompatible.Solution:
# Check Java version (must be 11 or 17)
java -version

# Set JAVA_HOME if needed
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH

# Verify PyFlink can find the JVM
python -c "from pyflink.java_gateway import get_gateway; get_gateway()"
Symptom: TypeError or PicklingError when passing custom Python objects between operators.Cause: PyFlink uses CloudPickle to serialize Python objects. Some objects (e.g., lambda captures with unserializable state, database connections) cannot be pickled.Solution: Restructure your UDF to avoid storing unserializable objects. Open connections in open() rather than __init__():
class DatabaseUDF(ScalarFunction):
    def open(self, context):
        # Open connection inside open(), not in __init__()
        import sqlite3
        self.conn = sqlite3.connect(":memory:")

    def close(self):
        if self.conn:
            self.conn.close()

    def eval(self, key):
        cursor = self.conn.execute("SELECT value FROM t WHERE key=?", (key,))
        row = cursor.fetchone()
        return row[0] if row else None
Symptom: The job runs indefinitely without producing results.Cause: Watermarks are not advancing, or .wait() was not called in a mini-cluster execution.Solution:
  • For bounded sources, confirm the source has a defined end of input.
  • For streaming jobs with event-time windows, make sure your source emits watermarks.
  • When running in mini-cluster mode with the Table API, call .wait() after execute_insert():
# In mini-cluster mode, wait for the job to finish
table.execute_insert("sink").wait()

# On a remote cluster, omit .wait() — it returns immediately after submission
table.execute_insert("sink")
Symptom: java.lang.ClassNotFoundException mentioning a Kafka, JDBC, or other connector class.Cause: The connector JAR is not on the classpath.Solution: Add the JAR to the job:
t_env.get_config().set(
    "pipeline.jars",
    "file:///path/to/flink-sql-connector-kafka-1.20.0.jar",
)
Or add it to Flink’s lib/ directory before starting the cluster.

Environment variables reference

VariableEffect
PYFLINK_GATEWAY_DISABLEDPrevent PyFlink from starting the JVM gateway. Useful for testing Python-only code paths.
PYFLINK_GATEWAY_PORTConnect to an already-running gateway on this port instead of launching a new one.
FLINK_HOMEPath to the Flink installation directory. PyFlink uses this to find JARs and configuration files.
JAVA_HOMEPath to the JDK installation. Required if java is not on PATH.
FLINK_CONF_DIROverride the directory containing config.yaml.

Build docs developers (and LLMs) love