Flink connectors let your PyFlink jobs read from and write to external systems like Kafka, object stores, databases, and message queues. Connectors are defined in the Java ecosystem and are accessed from Python through the Table API’s TableDescriptor or via SQL DDL strings.
How connectors work in PyFlink
Connectors in the Flink Table API are identified by a connector name (e.g., "kafka", "filesystem", "datagen"). The connector implementation lives in a JAR file that must be on the classpath. You configure the connector with key-value options and attach a format (e.g., JSON, CSV, Avro) where applicable.
Built-in connectors like datagen, print, and blackhole are bundled with Flink and require no extra JARs. All other connectors—Kafka, JDBC, Elasticsearch, etc.—require their connector JAR to be added to the job.
Adding connector JARs
Before using an external connector, download the matching JAR and add it to your environment:
from pyflink.table import TableEnvironment, EnvironmentSettings
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# Add the Kafka SQL connector JAR
t_env.get_config().set(
"pipeline.jars",
"file:///path/to/flink-sql-connector-kafka-1.20.0.jar",
)
Alternatively, add the JAR at the StreamExecutionEnvironment level:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.20.0.jar")
t_env = StreamTableEnvironment.create(env)
Kafka connector
Reading from Kafka
from pyflink.table import (
TableEnvironment, EnvironmentSettings, DataTypes, Schema, TableDescriptor, FormatDescriptor
)
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set(
"pipeline.jars",
"file:///path/to/flink-sql-connector-kafka-1.20.0.jar",
)
t_env.create_temporary_table(
"kafka_source",
TableDescriptor.for_connector("kafka")
.schema(
Schema.new_builder()
.column("user_id", DataTypes.BIGINT())
.column("event", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP(3))
.watermark("ts", "ts - INTERVAL '5' SECOND")
.build()
)
.option("topic", "user-events")
.option("properties.bootstrap.servers", "localhost:9092")
.option("properties.group.id", "pyflink-consumer")
.option("scan.startup.mode", "latest-offset")
.format(
FormatDescriptor.for_format("json")
.option("ignore-parse-errors", "true")
.build()
)
.build(),
)
result = t_env.from_path("kafka_source").select("*")
result.execute().print()
Writing to Kafka
t_env.create_temporary_table(
"kafka_sink",
TableDescriptor.for_connector("kafka")
.schema(
Schema.new_builder()
.column("user_id", DataTypes.BIGINT())
.column("event", DataTypes.STRING())
.build()
)
.option("topic", "processed-events")
.option("properties.bootstrap.servers", "localhost:9092")
.format("json")
.build(),
)
source_table.execute_insert("kafka_sink").wait()
Kafka with Upsert mode
Use the upsert-kafka connector to write a changelog stream where rows with the same key update or delete existing records:
t_env.create_temporary_table(
"upsert_sink",
TableDescriptor.for_connector("upsert-kafka")
.schema(
Schema.new_builder()
.column("user_id", DataTypes.BIGINT())
.column("total_events", DataTypes.BIGINT())
.primary_key("user_id")
.build()
)
.option("topic", "user-totals")
.option("properties.bootstrap.servers", "localhost:9092")
.format("json")
.build(),
)
Filesystem connector
The filesystem connector reads and writes files on local disk, HDFS, S3, or any Flink-supported file system.
Reading from files
from pyflink.table import (
TableEnvironment, EnvironmentSettings, DataTypes, Schema, TableDescriptor, FormatDescriptor
)
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
t_env.create_temporary_table(
"csv_source",
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("score", DataTypes.DOUBLE())
.build()
)
.option("path", "/data/input/")
.format(
FormatDescriptor.for_format("csv")
.option("field-delimiter", ",")
.build()
)
.build(),
)
t_env.from_path("csv_source").execute().print()
Writing to files
t_env.create_temporary_table(
"json_sink",
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.build()
)
.option("path", "/data/output/")
.option("auto-compaction", "true")
.format("json")
.build(),
)
source_table.execute_insert("json_sink").wait()
Using SQL DDL for connectors
Instead of TableDescriptor, you can define tables with SQL CREATE TABLE statements, which is often more concise:
t_env.execute_sql("""
CREATE TABLE kafka_source (
user_id BIGINT,
event STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'pyflink-consumer',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
t_env.execute_sql("""
CREATE TABLE print_sink (
user_id BIGINT,
event STRING
) WITH (
'connector' = 'print'
)
""")
t_env.execute_sql("""
INSERT INTO print_sink
SELECT user_id, event FROM kafka_source
""")
DataStream connectors from Python
The DataStream API has its own connector classes. These are lower-level than the Table API connectors but give you more control.
Kafka source and sink (DataStream)
from pyflink.common import Types, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
KafkaSource,
KafkaSink,
KafkaRecordSerializationSchema,
KafkaOffsetsInitializer,
)
from pyflink.datastream.formats.json import (
JsonRowDeserializationSchema,
JsonRowSerializationSchema,
)
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.20.0.jar")
row_type = Types.ROW([Types.LONG(), Types.STRING()])
# Source
kafka_source = (
KafkaSource.builder()
.set_topics("user-events")
.set_value_only_deserializer(
JsonRowDeserializationSchema.Builder()
.type_info(row_type)
.build()
)
.set_properties({"bootstrap.servers": "localhost:9092", "group.id": "my-group"})
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.build()
)
ds = env.from_source(
kafka_source,
watermark_strategy=WatermarkStrategy.no_watermarks(),
source_name="Kafka Source",
)
# Sink
kafka_sink = (
KafkaSink.builder()
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("processed-events")
.set_value_serialization_schema(
JsonRowSerializationSchema.Builder()
.with_type_info(row_type)
.build()
)
.build()
)
.set_bootstrap_servers("localhost:9092")
.build()
)
ds.map(lambda r: r).sink_to(kafka_sink)
env.execute("Kafka Job")
Available built-in connectors
| Connector | Requires JAR | Supports batch | Supports streaming |
|---|
datagen | No | Yes | Yes |
print | No | Yes | Yes |
blackhole | No | Yes | Yes |
filesystem | No | Yes | Yes |
kafka | Yes | No | Yes |
upsert-kafka | Yes | No | Yes |
jdbc | Yes | Yes | Yes |
elasticsearch-7 | Yes | No | Yes |
hbase-2.2 | Yes | Yes | Yes |
Always match the connector JAR version to your Flink version. Download connectors from the Flink downloads page or Maven Central.
Formats are used alongside connectors to serialize and deserialize data. Common formats available in the Table API:
| Format | Key |
|---|
| JSON | json |
| CSV | csv |
| Avro | avro |
| Parquet | parquet |
| ORC | orc |
| Debezium JSON | debezium-json |
| Canal JSON | canal-json |
| Raw | raw |
# Example: Avro format
TableDescriptor.for_connector("kafka")
.schema(...)
.option("topic", "events")
.option("properties.bootstrap.servers", "localhost:9092")
.format(
FormatDescriptor.for_format("avro")
.option("avro-confluent.url", "http://localhost:8081")
.build()
)
.build()