Skip to main content
PyFlink is the Python API for Apache Flink, giving you access to Flink’s powerful DataStream and Table APIs from Python. You can build scalable batch and streaming workloads—real-time data pipelines, large-scale exploratory analysis, machine learning pipelines, and ETL processes—without leaving Python. Under the hood, PyFlink uses Py4J to bridge Python and the Flink JVM runtime, so your Python code drives a full Flink cluster with all of its fault tolerance, state management, and connector ecosystem.

Choosing an API

Table API

A relational, SQL-like API for working with structured data. Ideal if you’re familiar with pandas or SQL and want to express transformations declaratively.

DataStream API

A lower-level API giving you direct control over streams, state, and time. Use this for complex event processing, custom windowing, or anything that requires fine-grained control.
You can mix both APIs in a single program by converting between Table and DataStream objects.

Prerequisites

  • Java 11 or 17 (required to run the Flink JVM runtime)
  • Python 3.9, 3.10, 3.11, or 3.12 (Python 3.8 and earlier are not supported)
  • pip 20.3 or later
PyFlink requires a compatible Java installation. Set JAVA_HOME if Java is not on your PATH. Java 8 is no longer supported.

Installation

1

Install PyFlink

pip install apache-flink
This installs the pyflink package along with its dependencies: Py4J, CloudPickle, python-dateutil, and Apache Beam (used for the portable runner).
2

Verify the installation

python -c "import pyflink; print(pyflink.__version__)"
You should see the installed version printed to stdout.
3

(Optional) Install PyArrow for Arrow-based UDFs

To use pandas UDFs and Arrow-optimized data transfer, install PyArrow:
pip install pyarrow
The examples below show a word count job written with both APIs so you can compare the styles.
from pyflink.table import EnvironmentSettings, TableEnvironment, DataTypes
from pyflink.table.expressions import col
from pyflink.table.udf import udf

# Create a batch TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())

# Inline UDF to upper-case a word
@udf(result_type=DataTypes.STRING())
def upper(s):
    return s.upper()

# Create an in-memory table from a list of tuples
words = ["flink", "python", "streaming", "state", "checkpoint",
         "flink", "python", "flink"]
table = t_env.from_elements([(w,) for w in words], schema=["word"])

# Count occurrences per word
result = (
    table
    .select(upper(col("word")).alias("word"))
    .group_by(col("word"))
    .select(col("word"), col("word").count.alias("cnt"))
)
result.execute().print()

Key concepts

StreamExecutionEnvironment

StreamExecutionEnvironment is the entry point for every DataStream program. It controls parallelism, checkpointing, and job submission.
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(10_000)  # checkpoint every 10 seconds

TableEnvironment and StreamTableEnvironment

TableEnvironment is the entry point for pure Table API programs. Use StreamTableEnvironment when you need to convert between tables and data streams.
from pyflink.table import TableEnvironment, EnvironmentSettings

# Pure Table API (batch or streaming)
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

# Or, to also access DataStream APIs
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

Python UDFs

Both APIs let you register Python functions as UDFs. The Table API supports scalar, table, aggregate, and table-aggregate UDFs:
from pyflink.table import DataTypes
from pyflink.table.udf import udf, ScalarFunction

# Decorator style
@udf(result_type=DataTypes.STRING())
def greet(name):
    return f"Hello, {name}!"

# Class style (gives access to open/close lifecycle)
class Greet(ScalarFunction):
    def eval(self, name):
        return f"Hello, {name}!"

greet_udf = udf(Greet(), result_type=DataTypes.STRING())
Run your script directly with Python. Flink starts an embedded mini-cluster automatically:
python my_job.py
When running on a remote cluster, call execute_insert(...).wait() or env.execute() to block until the job finishes. On a remote cluster the call returns immediately after submission—omit .wait() and the local process exits before results are produced.

Further reading

Execution mode

Understand in-process vs external process execution and Arrow-based UDFs.

Dependency management

Add Python packages, virtual environments, and JAR connectors to your job.

Connectors

Connect to Kafka, filesystems, and other systems from the Python Table API.

Metrics

Register custom counters, gauges, meters, and distributions from UDFs.

Build docs developers (and LLMs) love