Introduction to PyFlink, the Python API for Apache Flink’s DataStream and Table APIs. Learn how to install PyFlink and write your first streaming program.
PyFlink is the Python API for Apache Flink, giving you access to Flink’s powerful DataStream and Table APIs from Python. You can build scalable batch and streaming workloads—real-time data pipelines, large-scale exploratory analysis, machine learning pipelines, and ETL processes—without leaving Python.Under the hood, PyFlink uses Py4J to bridge Python and the Flink JVM runtime, so your Python code drives a full Flink cluster with all of its fault tolerance, state management, and connector ecosystem.
A relational, SQL-like API for working with structured data. Ideal if you’re familiar with pandas or SQL and want to express transformations declaratively.
DataStream API
A lower-level API giving you direct control over streams, state, and time. Use this for complex event processing, custom windowing, or anything that requires fine-grained control.
You can mix both APIs in a single program by converting between Table and DataStream objects.
The examples below show a word count job written with both APIs so you can compare the styles.
Table API
DataStream API
Mixed (Table + DataStream)
from pyflink.table import EnvironmentSettings, TableEnvironment, DataTypesfrom pyflink.table.expressions import colfrom pyflink.table.udf import udf# Create a batch TableEnvironmentt_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())# Inline UDF to upper-case a word@udf(result_type=DataTypes.STRING())def upper(s): return s.upper()# Create an in-memory table from a list of tupleswords = ["flink", "python", "streaming", "state", "checkpoint", "flink", "python", "flink"]table = t_env.from_elements([(w,) for w in words], schema=["word"])# Count occurrences per wordresult = ( table .select(upper(col("word")).alias("word")) .group_by(col("word")) .select(col("word"), col("word").count.alias("cnt")))result.execute().print()
from pyflink.common import Typesfrom pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()words = ["flink", "python", "streaming", "state", "checkpoint"]ds = env.from_collection( [(w, 1) for w in words * 10], type_info=Types.TUPLE([Types.STRING(), Types.INT()]),)# Key by word, then sum the countsresult = ( ds .key_by(lambda x: x[0]) .reduce(lambda a, b: (a[0], a[1] + b[1])))result.print()env.execute("Word Count")
from pyflink.common import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironment, DataTypes, Schemafrom pyflink.table.udf import udffrom pyflink.table.expressions import col# Use StreamTableEnvironment when mixing both APIsenv = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(env)@udf(result_type=DataTypes.BIGINT())def str_len(s): return len(s)# Start in Table APItable = t_env.from_elements( [(1, "hello"), (2, "world"), (3, "flink")], schema=["id", "word"],)table = table.select(col("id"), str_len(col("word")).alias("length"))# Convert to DataStream for lower-level operationsds = t_env.to_data_stream(table)ds = ds.map(lambda r: r[0] + r[1], output_type=Types.LONG())# Convert back to Tableresult_table = t_env.from_data_stream( ds, Schema.new_builder().column("f0", DataTypes.BIGINT()).build(),)result_table.execute().print()
TableEnvironment is the entry point for pure Table API programs. Use StreamTableEnvironment when you need to convert between tables and data streams.
from pyflink.table import TableEnvironment, EnvironmentSettings# Pure Table API (batch or streaming)t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# Or, to also access DataStream APIsfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(env)
When running on a remote cluster, call execute_insert(...).wait() or env.execute() to block until the job finishes. On a remote cluster the call returns immediately after submission—omit .wait() and the local process exits before results are produced.