Skip to main content
Flink connectors let your PyFlink jobs read from and write to external systems like Kafka, object stores, databases, and message queues. Connectors are defined in the Java ecosystem and are accessed from Python through the Table API’s TableDescriptor or via SQL DDL strings. Connectors in the Flink Table API are identified by a connector name (e.g., "kafka", "filesystem", "datagen"). The connector implementation lives in a JAR file that must be on the classpath. You configure the connector with key-value options and attach a format (e.g., JSON, CSV, Avro) where applicable.
Built-in connectors like datagen, print, and blackhole are bundled with Flink and require no extra JARs. All other connectors—Kafka, JDBC, Elasticsearch, etc.—require their connector JAR to be added to the job.

Adding connector JARs

Before using an external connector, download the matching JAR and add it to your environment:
from pyflink.table import TableEnvironment, EnvironmentSettings

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

# Add the Kafka SQL connector JAR
t_env.get_config().set(
    "pipeline.jars",
    "file:///path/to/flink-sql-connector-kafka-1.20.0.jar",
)
Alternatively, add the JAR at the StreamExecutionEnvironment level:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.20.0.jar")
t_env = StreamTableEnvironment.create(env)

Kafka connector

Reading from Kafka

from pyflink.table import (
    TableEnvironment, EnvironmentSettings, DataTypes, Schema, TableDescriptor, FormatDescriptor
)

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set(
    "pipeline.jars",
    "file:///path/to/flink-sql-connector-kafka-1.20.0.jar",
)

t_env.create_temporary_table(
    "kafka_source",
    TableDescriptor.for_connector("kafka")
        .schema(
            Schema.new_builder()
                .column("user_id", DataTypes.BIGINT())
                .column("event", DataTypes.STRING())
                .column("ts", DataTypes.TIMESTAMP(3))
                .watermark("ts", "ts - INTERVAL '5' SECOND")
                .build()
        )
        .option("topic", "user-events")
        .option("properties.bootstrap.servers", "localhost:9092")
        .option("properties.group.id", "pyflink-consumer")
        .option("scan.startup.mode", "latest-offset")
        .format(
            FormatDescriptor.for_format("json")
                .option("ignore-parse-errors", "true")
                .build()
        )
        .build(),
)

result = t_env.from_path("kafka_source").select("*")
result.execute().print()

Writing to Kafka

t_env.create_temporary_table(
    "kafka_sink",
    TableDescriptor.for_connector("kafka")
        .schema(
            Schema.new_builder()
                .column("user_id", DataTypes.BIGINT())
                .column("event", DataTypes.STRING())
                .build()
        )
        .option("topic", "processed-events")
        .option("properties.bootstrap.servers", "localhost:9092")
        .format("json")
        .build(),
)

source_table.execute_insert("kafka_sink").wait()

Kafka with Upsert mode

Use the upsert-kafka connector to write a changelog stream where rows with the same key update or delete existing records:
t_env.create_temporary_table(
    "upsert_sink",
    TableDescriptor.for_connector("upsert-kafka")
        .schema(
            Schema.new_builder()
                .column("user_id", DataTypes.BIGINT())
                .column("total_events", DataTypes.BIGINT())
                .primary_key("user_id")
                .build()
        )
        .option("topic", "user-totals")
        .option("properties.bootstrap.servers", "localhost:9092")
        .format("json")
        .build(),
)

Filesystem connector

The filesystem connector reads and writes files on local disk, HDFS, S3, or any Flink-supported file system.

Reading from files

from pyflink.table import (
    TableEnvironment, EnvironmentSettings, DataTypes, Schema, TableDescriptor, FormatDescriptor
)

t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())

t_env.create_temporary_table(
    "csv_source",
    TableDescriptor.for_connector("filesystem")
        .schema(
            Schema.new_builder()
                .column("id", DataTypes.INT())
                .column("name", DataTypes.STRING())
                .column("score", DataTypes.DOUBLE())
                .build()
        )
        .option("path", "/data/input/")
        .format(
            FormatDescriptor.for_format("csv")
                .option("field-delimiter", ",")
                .build()
        )
        .build(),
)

t_env.from_path("csv_source").execute().print()

Writing to files

t_env.create_temporary_table(
    "json_sink",
    TableDescriptor.for_connector("filesystem")
        .schema(
            Schema.new_builder()
                .column("id", DataTypes.INT())
                .column("name", DataTypes.STRING())
                .build()
        )
        .option("path", "/data/output/")
        .option("auto-compaction", "true")
        .format("json")
        .build(),
)

source_table.execute_insert("json_sink").wait()

Using SQL DDL for connectors

Instead of TableDescriptor, you can define tables with SQL CREATE TABLE statements, which is often more concise:
t_env.execute_sql("""
    CREATE TABLE kafka_source (
        user_id BIGINT,
        event   STRING,
        ts      TIMESTAMP(3),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic'     = 'user-events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id'          = 'pyflink-consumer',
        'scan.startup.mode'            = 'latest-offset',
        'format'    = 'json'
    )
""")

t_env.execute_sql("""
    CREATE TABLE print_sink (
        user_id BIGINT,
        event   STRING
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.execute_sql("""
    INSERT INTO print_sink
    SELECT user_id, event FROM kafka_source
""")

DataStream connectors from Python

The DataStream API has its own connector classes. These are lower-level than the Table API connectors but give you more control.

Kafka source and sink (DataStream)

from pyflink.common import Types, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaSink,
    KafkaRecordSerializationSchema,
    KafkaOffsetsInitializer,
)
from pyflink.datastream.formats.json import (
    JsonRowDeserializationSchema,
    JsonRowSerializationSchema,
)

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.20.0.jar")

row_type = Types.ROW([Types.LONG(), Types.STRING()])

# Source
kafka_source = (
    KafkaSource.builder()
    .set_topics("user-events")
    .set_value_only_deserializer(
        JsonRowDeserializationSchema.Builder()
        .type_info(row_type)
        .build()
    )
    .set_properties({"bootstrap.servers": "localhost:9092", "group.id": "my-group"})
    .set_starting_offsets(KafkaOffsetsInitializer.earliest())
    .build()
)

ds = env.from_source(
    kafka_source,
    watermark_strategy=WatermarkStrategy.no_watermarks(),
    source_name="Kafka Source",
)

# Sink
kafka_sink = (
    KafkaSink.builder()
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
        .set_topic("processed-events")
        .set_value_serialization_schema(
            JsonRowSerializationSchema.Builder()
            .with_type_info(row_type)
            .build()
        )
        .build()
    )
    .set_bootstrap_servers("localhost:9092")
    .build()
)

ds.map(lambda r: r).sink_to(kafka_sink)
env.execute("Kafka Job")

Available built-in connectors

ConnectorRequires JARSupports batchSupports streaming
datagenNoYesYes
printNoYesYes
blackholeNoYesYes
filesystemNoYesYes
kafkaYesNoYes
upsert-kafkaYesNoYes
jdbcYesYesYes
elasticsearch-7YesNoYes
hbase-2.2YesYesYes
Always match the connector JAR version to your Flink version. Download connectors from the Flink downloads page or Maven Central.

Formats

Formats are used alongside connectors to serialize and deserialize data. Common formats available in the Table API:
FormatKey
JSONjson
CSVcsv
Avroavro
Parquetparquet
ORCorc
Debezium JSONdebezium-json
Canal JSONcanal-json
Rawraw
# Example: Avro format
TableDescriptor.for_connector("kafka")
    .schema(...)
    .option("topic", "events")
    .option("properties.bootstrap.servers", "localhost:9092")
    .format(
        FormatDescriptor.for_format("avro")
            .option("avro-confluent.url", "http://localhost:8081")
            .build()
    )
    .build()

Build docs developers (and LLMs) love