Skip to main content
The JDBC connector enables Flink to read from and write to relational databases using the standard JDBC interface. It supports bounded scan sources, lookup joins for enrichment, and streaming and batch sinks. The connector is maintained in a separate repository: Repository: apache/flink-connector-jdbc
The JDBC connector is externalized from the main Flink repository. Add the flink-connector-jdbc dependency plus a JDBC driver (such as MySQL, PostgreSQL, or Oracle) to your project.

Dependency

<!-- JDBC connector -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>${flink-connector-jdbc.version}</version>
</dependency>

<!-- Example: PostgreSQL driver -->
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>postgresql</artifactId>
  <version>42.6.0</version>
</dependency>

Source table

The JDBC source performs a bounded scan of a table or custom query:
CREATE TABLE products (
  product_id   INT,
  product_name STRING,
  price        DECIMAL(10, 2),
  category     STRING
) WITH (
  'connector'  = 'jdbc',
  'url'        = 'jdbc:postgresql://localhost:5432/mydb',
  'table-name' = 'products',
  'driver'     = 'org.postgresql.Driver',
  'username'   = 'flink_user',
  'password'   = 'secret'
)
You can limit the scan to specific rows using scan.partition.* options for parallel reads, or use a custom query with query:
CREATE TABLE active_products (
  product_id   INT,
  product_name STRING,
  price        DECIMAL(10, 2)
) WITH (
  'connector' = 'jdbc',
  'url'       = 'jdbc:postgresql://localhost:5432/mydb',
  'query'     = 'SELECT product_id, product_name, price FROM products WHERE active = true',
  'username'  = 'flink_user',
  'password'  = 'secret'
)

Parallel scan with partitioning

For large tables, enable parallel reads by partitioning the scan:
CREATE TABLE large_orders (
  order_id   BIGINT,
  customer   STRING,
  amount     DECIMAL(12, 2)
) WITH (
  'connector'               = 'jdbc',
  'url'                     = 'jdbc:mysql://localhost:3306/shop',
  'table-name'              = 'orders',
  'username'                = 'flink_user',
  'password'                = 'secret',
  'scan.partition.column'   = 'order_id',
  'scan.partition.num'      = '10',
  'scan.partition.lower-bound' = '1',
  'scan.partition.upper-bound' = '10000000'
)

Sink table

The JDBC sink appends or upserts rows into a database table:
CREATE TABLE order_summary (
  order_id   BIGINT,
  total      DECIMAL(12, 2),
  updated_at TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector'        = 'jdbc',
  'url'              = 'jdbc:postgresql://localhost:5432/reports',
  'table-name'       = 'order_summary',
  'username'         = 'flink_user',
  'password'         = 'secret',
  'sink.buffer-flush.max-rows' = '1000',
  'sink.buffer-flush.interval' = '2 s'
)
When a primary key is defined and the connector supports it, the sink performs upserts (INSERT … ON CONFLICT UPDATE or equivalent). Without a primary key, rows are always appended.
INSERT INTO order_summary
SELECT order_id, SUM(amount) AS total, MAX(event_ts) AS updated_at
FROM orders_stream
GROUP BY order_id;

Lookup join

The JDBC connector supports lookup joins for enriching streaming data with database lookups:
CREATE TABLE products_dim (
  product_id   INT,
  product_name STRING,
  category     STRING
) WITH (
  'connector'                     = 'jdbc',
  'url'                           = 'jdbc:postgresql://localhost:5432/mydb',
  'table-name'                    = 'products',
  'username'                      = 'flink_user',
  'password'                      = 'secret',
  'lookup.cache'                  = 'PARTIAL',
  'lookup.partial-cache.max-rows' = '10000',
  'lookup.partial-cache.expire-after-write' = '10 min'
)
Reference the lookup table in a join:
SELECT
  o.order_id,
  o.user_id,
  p.product_name,
  p.category,
  o.amount
FROM orders_stream AS o
JOIN products_dim FOR SYSTEM_TIME AS OF o.proctime AS p
  ON o.product_id = p.product_id;
For temporal lookup joins, the stream table must have a proctime attribute defined. The FOR SYSTEM_TIME AS OF syntax ensures each lookup uses the dimension table state at the time the record arrived.

Key connector options

OptionRequiredDefaultDescription
connectorYesMust be 'jdbc'.
urlYesJDBC connection URL, e.g. jdbc:postgresql://host:5432/db.
table-nameYes (if query not set)Name of the table to read from or write to.
driverNoauto-detectedJDBC driver class name.
usernameNoDatabase username.
passwordNoDatabase password.
queryNoCustom SQL query for source scan. Cannot be used together with table-name for sources.
connection.max-retry-timeoutNo60 sMaximum timeout for retrying failed connections.
scan.partition.columnNoColumn used to partition the scan. Must be a numeric, date, or timestamp type.
scan.partition.numNoNumber of partitions to split the scan into.
scan.partition.lower-boundNoLower bound of the partition column.
scan.partition.upper-boundNoUpper bound of the partition column.
scan.fetch-sizeNo0 (unlimited)Number of rows to fetch per round-trip to the database.
scan.auto-commitNotrueWhether the JDBC connection uses auto-commit.
lookup.cacheNoNONECaching strategy: NONE, PARTIAL, or FULL.
lookup.partial-cache.max-rowsNoMaximum number of rows in the partial cache.
lookup.partial-cache.expire-after-writeNoTTL for cache entries.
lookup.partial-cache.cache-missing-keyNotrueWhether to cache lookups that returned no rows.
lookup.max-retriesNo3Maximum number of retries for failed lookups.
sink.buffer-flush.max-rowsNo100Maximum number of rows to buffer before flushing to the database. Set to 0 to disable buffering.
sink.buffer-flush.intervalNo1 sMaximum time to buffer rows before flushing. Set to 0 to flush only when the buffer is full.
sink.max-retriesNo3Maximum number of retries for failed sink writes.
sink.parallelismNoParallelism of the JDBC sink operator.

MySQL example

CREATE TABLE mysql_users (
  id       BIGINT,
  username STRING,
  email    STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'  = 'jdbc',
  'url'        = 'jdbc:mysql://mysql-host:3306/app',
  'table-name' = 'users',
  'driver'     = 'com.mysql.cj.jdbc.Driver',
  'username'   = 'flink',
  'password'   = 'password123'
);

-- Read all users
SELECT * FROM mysql_users WHERE email LIKE '%@example.com';

-- Write aggregated results back
INSERT INTO mysql_users
SELECT id, username, new_email FROM updated_users;

Build docs developers (and LLMs) love