Skip to main content

Overview

Metadb supports reading Kafka messages in the format produced by the Debezium PostgreSQL connector for Kafka Connect.
Configuration of Kafka, Kafka Connect, Debezium, and PostgreSQL logical decoding is beyond the scope of this documentation. The following provides essential integration notes.

Data Flow

Data flows through the following pipeline:
  1. Source PostgreSQL database
  2. Kafka Connect/Debezium
  3. Kafka
  4. Metadb
  5. Metadb database

Creating a Connector

1

Enable logical decoding

In the source PostgreSQL database, enable logical decoding by setting:
wal_level = logical
Add this to postgresql.conf for the source database.
Timeout settings like idle_in_transaction_session_timeout can cause the connector to fail during initial snapshot. Configure these appropriately.
2

Create heartbeat table

In the source database, create a heartbeat table to avoid disk space spikes:
CREATE SCHEMA admin;

CREATE TABLE admin.heartbeat (last_heartbeat TIMESTAMPTZ PRIMARY KEY);

INSERT INTO admin.heartbeat (last_heartbeat) VALUES (NOW());
3

Create connector configuration

Create a connector configuration file for Kafka Connect:
{
    "name": "sensor-1-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "sourcedb",
        "database.hostname": "example.host.name",
        "database.password": "eHrkGrZL8mMJOFgToqqL",
        "database.port": "5432",
        "database.server.name": "metadb_sensor_1",
        "database.user": "dbuser",
        "plugin.name": "pgoutput",
        "snapshot.mode": "exported",
        "tasks.max": "1",
        "truncate.handling.mode": "include",
        "publication.autocreate.mode": "filtered",
        "heartbeat.interval.ms": "30000",
        "heartbeat.action.query": "UPDATE admin.heartbeat SET last_heartbeat = NOW();"
    }
}
The 1 in name and database.server.name serves as a version number. Increment it when resynchronizing with a new connector.
4

Register the connector

Create the connector using the Kafka Connect REST API:
curl -X POST -i -H "Accept: application/json" -H "Content-Type: application/json" \
     -d @connector.json https://kafka.connect.server/connectors

Heartbeat Configuration

The heartbeat.interval.ms and heartbeat.action.query settings are recommended to avoid disk space spikes in the source database. Use the schema_stop_filter option in the CREATE DATA SOURCE command to filter out the heartbeat table.

Primary Key Requirement

Metadb requires all streamed tables to have a primary key defined. Tables without primary keys should be filtered using schema.exclude.list or table.exclude.list in the Debezium configuration, or they will generate error messages.

Monitoring Replication

Critical: The replication slot disk usage must be monitored, as it can grow too large under error conditions and potentially fill the disk.
To show replication slot disk usage in the source database:
SELECT slot_name, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replicationSlotLag, 
       active 
FROM pg_replication_slots;
It is recommended to allocate plenty of extra disk space in the source database.

Creating the Data Source

In Metadb, define a data source using the CREATE DATA SOURCE statement:
CREATE DATA SOURCE sensor TYPE kafka OPTIONS (
    brokers 'kafka:29092',
    topics '^metadb_sensor_1\.',
    consumer_group 'metadb_sensor_1_1',
    add_schema_prefix 'sensor_',
    schema_stop_filter 'admin'
);

Initial Synchronization

When a new data source is first configured, Metadb automatically enters synchronizing mode, which pauses periodic transforms and external SQL.
1

Wait for snapshot completion

Monitor the log for the message:
source snapshot complete (deadline exceeded)
2

Stop the server

metadb stop -D data
3

Run endsync

Complete the synchronization:
metadb endsync -D data --source sensor
4

Start the server

Once endsync has finished, restart the Metadb server.

Deleting a Connection

After deleting a connection in Metadb (see Resynchronization), drop the replication slot and publication in the source database:
SELECT pg_drop_replication_slot('debezium');

DROP PUBLICATION dbz_publication;

Build docs developers (and LLMs) love