The JDBC connector enables Flink to read from and write to relational databases using the standard JDBC interface. It supports bounded scan sources, lookup joins for enrichment, and streaming and batch sinks. The connector is maintained in a separate repository:
Repository: apache/flink-connector-jdbc
The JDBC connector is externalized from the main Flink repository. Add the flink-connector-jdbc dependency plus a JDBC driver (such as MySQL, PostgreSQL, or Oracle) to your project.
Dependency
<!-- JDBC connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink-connector-jdbc.version}</version>
</dependency>
<!-- Example: PostgreSQL driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
Source table
The JDBC source performs a bounded scan of a table or custom query:
CREATE TABLE products (
product_id INT,
product_name STRING,
price DECIMAL(10, 2),
category STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/mydb',
'table-name' = 'products',
'driver' = 'org.postgresql.Driver',
'username' = 'flink_user',
'password' = 'secret'
)
You can limit the scan to specific rows using scan.partition.* options for parallel reads, or use a custom query with query:
CREATE TABLE active_products (
product_id INT,
product_name STRING,
price DECIMAL(10, 2)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/mydb',
'query' = 'SELECT product_id, product_name, price FROM products WHERE active = true',
'username' = 'flink_user',
'password' = 'secret'
)
Parallel scan with partitioning
For large tables, enable parallel reads by partitioning the scan:
CREATE TABLE large_orders (
order_id BIGINT,
customer STRING,
amount DECIMAL(12, 2)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/shop',
'table-name' = 'orders',
'username' = 'flink_user',
'password' = 'secret',
'scan.partition.column' = 'order_id',
'scan.partition.num' = '10',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '10000000'
)
Sink table
The JDBC sink appends or upserts rows into a database table:
CREATE TABLE order_summary (
order_id BIGINT,
total DECIMAL(12, 2),
updated_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/reports',
'table-name' = 'order_summary',
'username' = 'flink_user',
'password' = 'secret',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '2 s'
)
When a primary key is defined and the connector supports it, the sink performs upserts (INSERT … ON CONFLICT UPDATE or equivalent). Without a primary key, rows are always appended.
INSERT INTO order_summary
SELECT order_id, SUM(amount) AS total, MAX(event_ts) AS updated_at
FROM orders_stream
GROUP BY order_id;
Lookup join
The JDBC connector supports lookup joins for enriching streaming data with database lookups:
CREATE TABLE products_dim (
product_id INT,
product_name STRING,
category STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/mydb',
'table-name' = 'products',
'username' = 'flink_user',
'password' = 'secret',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '10000',
'lookup.partial-cache.expire-after-write' = '10 min'
)
Reference the lookup table in a join:
SELECT
o.order_id,
o.user_id,
p.product_name,
p.category,
o.amount
FROM orders_stream AS o
JOIN products_dim FOR SYSTEM_TIME AS OF o.proctime AS p
ON o.product_id = p.product_id;
For temporal lookup joins, the stream table must have a proctime attribute defined. The FOR SYSTEM_TIME AS OF syntax ensures each lookup uses the dimension table state at the time the record arrived.
Key connector options
| Option | Required | Default | Description |
|---|
connector | Yes | — | Must be 'jdbc'. |
url | Yes | — | JDBC connection URL, e.g. jdbc:postgresql://host:5432/db. |
table-name | Yes (if query not set) | — | Name of the table to read from or write to. |
driver | No | auto-detected | JDBC driver class name. |
username | No | — | Database username. |
password | No | — | Database password. |
query | No | — | Custom SQL query for source scan. Cannot be used together with table-name for sources. |
connection.max-retry-timeout | No | 60 s | Maximum timeout for retrying failed connections. |
scan.partition.column | No | — | Column used to partition the scan. Must be a numeric, date, or timestamp type. |
scan.partition.num | No | — | Number of partitions to split the scan into. |
scan.partition.lower-bound | No | — | Lower bound of the partition column. |
scan.partition.upper-bound | No | — | Upper bound of the partition column. |
scan.fetch-size | No | 0 (unlimited) | Number of rows to fetch per round-trip to the database. |
scan.auto-commit | No | true | Whether the JDBC connection uses auto-commit. |
lookup.cache | No | NONE | Caching strategy: NONE, PARTIAL, or FULL. |
lookup.partial-cache.max-rows | No | — | Maximum number of rows in the partial cache. |
lookup.partial-cache.expire-after-write | No | — | TTL for cache entries. |
lookup.partial-cache.cache-missing-key | No | true | Whether to cache lookups that returned no rows. |
lookup.max-retries | No | 3 | Maximum number of retries for failed lookups. |
sink.buffer-flush.max-rows | No | 100 | Maximum number of rows to buffer before flushing to the database. Set to 0 to disable buffering. |
sink.buffer-flush.interval | No | 1 s | Maximum time to buffer rows before flushing. Set to 0 to flush only when the buffer is full. |
sink.max-retries | No | 3 | Maximum number of retries for failed sink writes. |
sink.parallelism | No | — | Parallelism of the JDBC sink operator. |
MySQL example
CREATE TABLE mysql_users (
id BIGINT,
username STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql-host:3306/app',
'table-name' = 'users',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'flink',
'password' = 'password123'
);
-- Read all users
SELECT * FROM mysql_users WHERE email LIKE '%@example.com';
-- Write aggregated results back
INSERT INTO mysql_users
SELECT id, username, new_email FROM updated_users;