Skip to main content
Flink’s Table API and SQL programs connect to external systems by defining table sources and table sinks. A table source provides read access to data stored in an external system; a table sink emits table rows to an external storage system. Connectors are registered using SQL CREATE TABLE statements. Once a table is created, you can reference it in any Table API or SQL query.

Defining a table with DDL

You define a connector’s configuration entirely in the WITH clause of CREATE TABLE. The connector key identifies which connector factory to use; the remaining keys are connector-specific.
CREATE TABLE MyUserTable (
  `user`    BIGINT,
  `message` STRING,
  `rowtime` TIMESTAMP(3) METADATA FROM 'timestamp',
  `proctime` AS PROCTIME(),
  WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_name',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'
)
Flink loads connector and format factories via Java’s Service Provider Interface (SPI). Exactly one factory matching the connector key must be available on the classpath; if zero or more than one match, Flink throws an exception.

Schema mapping

The CREATE TABLE body declares physical columns, constraints, and watermarks. Flink holds no data itself—the schema only describes how to map between the external system’s representation and Flink’s row model. Mapping behavior depends on the connector:
  • MySQL (JDBC): maps by field name (case-insensitive)
  • CSV (Filesystem): maps by field position (field names can be arbitrary)

Metadata columns

Some connectors expose metadata fields alongside the payload. Declare them with the METADATA keyword:
CREATE TABLE kafka_table (
  value STRING,
  `timestamp` TIMESTAMP(3) METADATA,   -- Kafka record timestamp
  `partition` INT METADATA VIRTUAL       -- virtual: not written to Kafka
) WITH (
  'connector' = 'kafka',
  ...
)

Primary keys

Primary key constraints are used by sinks for upsert operations:
CREATE TABLE upsert_table (
  id    INT,
  value STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (...)
Flink does not enforce primary key uniqueness at runtime—use NOT ENFORCED. The sink implementation uses the key to perform upserts.

Time attributes

Declare a proctime attribute with PROCTIME():
CREATE TABLE MyTable (
  event_time TIMESTAMP(3),
  proc_time AS PROCTIME(),
  WATERMARK FOR event_time AS event_time - INTERVAL '2' SECOND
) WITH (...)
For rowtime attributes, use a WATERMARK clause that references an existing TIMESTAMP(3) column or a computed expression:
-- From a string field
CREATE TABLE logs (
  log_ts   STRING,
  event_ts AS TO_TIMESTAMP(log_ts),
  WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH (...)

Supported connectors

ConnectorSourceSink
FilesystemBounded Scan, Unbounded ScanStreaming Sink, Batch Sink
Apache KafkaUnbounded ScanStreaming Sink, Batch Sink
JDBCBounded Scan, LookupStreaming Sink, Batch Sink
Elasticsearch 6.x / 7.xNot supportedStreaming Sink, Batch Sink
Opensearch 1.x / 2.xNot supportedStreaming Sink, Batch Sink
Amazon DynamoDBNot supportedStreaming Sink, Batch Sink
Amazon Kinesis Data StreamsUnbounded ScanStreaming Sink
Amazon Kinesis Data FirehoseNot supportedStreaming Sink
Apache HBase 1.4.x / 2.2.xBounded Scan, LookupStreaming Sink, Batch Sink
Apache HiveUnbounded Scan, Bounded Scan, LookupStreaming Sink, Batch Sink
MongoDB 3.6.x–7.0.xBounded Scan, LookupStreaming Sink, Batch Sink

Filesystem

Read and write partitioned files. Supports CSV, JSON, Avro, Parquet, ORC, and CDC formats.

Apache Kafka

Unbounded source and streaming sink. Supports JSON, Avro, CSV, Protobuf, and CDC formats.

JDBC

Read from and write to any JDBC-compatible database. Supports lookup joins.

Resolving SPI conflicts in uber-JARs

When you build an uber-JAR containing multiple connectors or formats, their META-INF/services/org.apache.flink.table.factories.Factory files may overwrite each other. Use the Maven Shade plugin’s ServicesResourceTransformer to merge them:
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <executions>
    <execution>
      <phase>package</phase>
      <goals><goal>shade</goal></goals>
      <configuration>
        <transformers combine.children="append">
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>
Without this transformer, only one factory file survives the merge and other connectors or formats will fail to load at runtime.

Build docs developers (and LLMs) love