CREATE TABLE statements. Once a table is created, you can reference it in any Table API or SQL query.
Defining a table with DDL
You define a connector’s configuration entirely in theWITH clause of CREATE TABLE. The connector key identifies which connector factory to use; the remaining keys are connector-specific.
connector key must be available on the classpath; if zero or more than one match, Flink throws an exception.
Schema mapping
TheCREATE TABLE body declares physical columns, constraints, and watermarks. Flink holds no data itself—the schema only describes how to map between the external system’s representation and Flink’s row model.
Mapping behavior depends on the connector:
- MySQL (JDBC): maps by field name (case-insensitive)
- CSV (Filesystem): maps by field position (field names can be arbitrary)
Metadata columns
Some connectors expose metadata fields alongside the payload. Declare them with theMETADATA keyword:
Primary keys
Primary key constraints are used by sinks for upsert operations:NOT ENFORCED. The sink implementation uses the key to perform upserts.
Time attributes
Declare a proctime attribute withPROCTIME():
WATERMARK clause that references an existing TIMESTAMP(3) column or a computed expression:
Supported connectors
| Connector | Source | Sink |
|---|---|---|
| Filesystem | Bounded Scan, Unbounded Scan | Streaming Sink, Batch Sink |
| Apache Kafka | Unbounded Scan | Streaming Sink, Batch Sink |
| JDBC | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
| Elasticsearch 6.x / 7.x | Not supported | Streaming Sink, Batch Sink |
| Opensearch 1.x / 2.x | Not supported | Streaming Sink, Batch Sink |
| Amazon DynamoDB | Not supported | Streaming Sink, Batch Sink |
| Amazon Kinesis Data Streams | Unbounded Scan | Streaming Sink |
| Amazon Kinesis Data Firehose | Not supported | Streaming Sink |
| Apache HBase 1.4.x / 2.2.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
| Apache Hive | Unbounded Scan, Bounded Scan, Lookup | Streaming Sink, Batch Sink |
| MongoDB 3.6.x–7.0.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
Filesystem
Read and write partitioned files. Supports CSV, JSON, Avro, Parquet, ORC, and CDC formats.
Apache Kafka
Unbounded source and streaming sink. Supports JSON, Avro, CSV, Protobuf, and CDC formats.
JDBC
Read from and write to any JDBC-compatible database. Supports lookup joins.
Resolving SPI conflicts in uber-JARs
When you build an uber-JAR containing multiple connectors or formats, theirMETA-INF/services/org.apache.flink.table.factories.Factory files may overwrite each other. Use the Maven Shade plugin’s ServicesResourceTransformer to merge them:

