Skip to main content
Change Data Capture (CDC) in YugabyteDB captures row-level changes from database tables and streams them to external systems using PostgreSQL-compatible logical replication. CDC enables real-time data pipelines, microservice architectures, data warehousing, and event-driven applications.

Architecture

YugabyteDB CDC is built on PostgreSQL logical replication and uses a publish-subscribe model:
  • Publications define which tables to replicate
  • Replication Slots track the streaming position and checkpoint progress
  • Output Plugins (yboutput or pgoutput) format change events
  • Debezium Connector converts the replication stream to Kafka messages

How CDC Works

YugabyteDB automatically shards tables into tablets, each with its own Write-Ahead Log (WAL). The CDC process:
  1. Snapshot Phase: On first connection, the connector takes a consistent snapshot of all configured tables
  2. Streaming Phase: After snapshot completion, the connector continuously streams changes from the WAL
  3. Change Events: Each INSERT, UPDATE, and DELETE operation produces a change event record
  4. Kafka Topics: Events are published to separate Kafka topics per table
WAL segments are periodically purged, so the connector maintains a replication slot to ensure required WAL segments are retained.

Publications and Replication Slots

Creating a Publication

Publications define the tables to stream:
-- Publish specific tables
CREATE PUBLICATION mypublication FOR TABLE users, departments;

-- Publish all tables in the database
CREATE PUBLICATION alltables FOR ALL TABLES;

Managing Publications

-- Add tables to a publication
ALTER PUBLICATION my_publication ADD TABLE users, departments;

-- Remove tables from a publication
ALTER PUBLICATION my_publication DROP TABLE departments;

-- Replace the table list
ALTER PUBLICATION my_publication SET TABLE users, employees;

-- Drop a publication
DROP PUBLICATION my_publication;

Creating Replication Slots

Replication slots checkpoint the streaming position:
-- Create a replication slot with yboutput plugin
SELECT pg_create_logical_replication_slot(
  'test_replication_slot',
  'yboutput'
);

-- Create with specific record type
SELECT pg_create_logical_replication_slot(
  'test_replication_slot',
  'yboutput',
  'FULL'
);
Record Types:
  • FULL - Include all column values before and after changes
  • CHANGE - Only changed columns plus primary key (default)
  • CHANGE_OLD_NEW - Changed columns with before/after values
  • DEFAULT - PostgreSQL-compatible default behavior
  • NOTHING - Minimal information (INSERT only)

Using the Streaming Protocol

Alternatively, create slots via streaming protocol:
CREATE_REPLICATION_SLOT test_replication_slot LOGICAL yboutput USE_SNAPSHOT

-- With record type
CREATE_REPLICATION_SLOT test_replication_slot LOGICAL yboutput 
  WITH RECORD_TYPE FULL

Dropping Replication Slots

-- Drop an inactive slot
DROP_REPLICATION_SLOT inactive_replication_slot;

-- Wait for active slot to become inactive
DROP_REPLICATION_SLOT active_replication_slot WAIT;
A slot is considered “active” if consumed within the timeframe defined by ysql_cdc_active_replication_slot_window_ms (default: 5 minutes).

Debezium Connector Setup

The YugabyteDB Debezium Connector streams changes to Kafka using the replication protocol.

Connector Configuration

Basic connector properties:
{
  "name": "yugabytedb-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.YugabyteDBConnector",
    "database.hostname": "localhost",
    "database.port": "5433",
    "database.user": "yugabyte",
    "database.password": "yugabyte",
    "database.dbname": "yugabyte",
    "database.server.name": "dbserver1",
    "slot.name": "test_replication_slot",
    "publication.name": "mypublication",
    "plugin.name": "yboutput",
    "table.include.list": "public.users,public.departments",
    "snapshot.mode": "initial"
  }
}

Key Configuration Properties

PropertyDescriptionDefault
slot.nameReplication slot name to consume fromRequired
slot.drop.on.stopDrop slot when connector stopsfalse
publication.namePublication defining tables to streamOptional
publication.autocreate.modeAuto-create publicationfiltered
plugin.nameOutput plugin (yboutput or pgoutput)yboutput
snapshot.modeInitial snapshot behavior (initial, never, initial_only)initial

Snapshot Modes

  • initial (default): Perform snapshot if no offset exists, then stream changes
  • never: Skip snapshot; start streaming from slot creation or stored offset
  • initial_only: Perform snapshot and stop before streaming

Complete Setup Example

-- Step 1: Create tables
CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  username VARCHAR(255),
  email VARCHAR(255)
);

CREATE TABLE departments (
  id SERIAL PRIMARY KEY,
  name VARCHAR(255)
);

-- Step 2: Create publication
CREATE PUBLICATION pub FOR TABLE users, departments;

-- Step 3: Create replication slot
CREATE_REPLICATION_SLOT my_slot LOGICAL yboutput USE_SNAPSHOT;

-- Step 4: Configure Debezium connector
-- Use connector configuration shown above with:
--   slot.name = 'my_slot'
--   publication.name = 'pub'
--   publication.autocreate.mode = 'false'

Output Plugins

yboutput Plugin

Native YugabyteDB output plugin with enhanced features:
{
  "before": null,
  "after": {
    "employee_id": {"value": 1001, "set": true},
    "employee_name": {"value": "Alice", "set": true},
    "employee_dept": {"value": "Packaging", "set": true}
  },
  "op": "c"
}
The set field indicates whether a column value was explicitly set.

pgoutput Plugin

Standard PostgreSQL plugin for compatibility:
{
  "before": null,
  "after": {
    "employee_id": 1001,
    "employee_name": "Alice",
    "employee_dept": "Packaging"
  },
  "op": "c"
}

Replica Identity

Controls what information is included in UPDATE and DELETE events:
-- FULL: Include all column values before and after
ALTER TABLE users REPLICA IDENTITY FULL;

-- DEFAULT: Only primary key in before-image for DELETEs
ALTER TABLE users REPLICA IDENTITY DEFAULT;

-- CHANGE: Only changed columns plus primary key (yboutput only)
ALTER TABLE users REPLICA IDENTITY CHANGE;

-- NOTHING: No before-image (INSERT only)
ALTER TABLE users REPLICA IDENTITY NOTHING;

Impact on Change Events

IdentityINSERTUPDATE BeforeUPDATE AfterDELETE Before
FULLAll columnsAll columnsAll columnsAll columns
DEFAULTAll columnsNoneAll columnsPK only
CHANGEAll columnsNoneChanged + PKPK only
NOTHINGAll columnsN/AN/AN/A

Streaming to Kafka

Topic Naming

By default, changes stream to topics named: {topic.prefix}.{schema}.{table} Example topics for dbserver1 prefix:
  • dbserver1.public.users
  • dbserver1.public.departments
  • dbserver1.public.orders

Change Event Structure

{
  "schema": { /* Kafka Connect schema */ },
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "username": "alice",
      "email": "[email protected]"
    },
    "source": {
      "version": "2.5.2.Final",
      "connector": "yugabytedb",
      "name": "dbserver1",
      "ts_ms": 1559033904863,
      "snapshot": false,
      "db": "yugabyte",
      "schema": "public",
      "table": "users",
      "txId": 555,
      "lsn": 24023128
    },
    "op": "c",
    "ts_ms": 1559033904863
  }
}

Operation Types

  • c - Create (INSERT)
  • r - Read (snapshot)
  • u - Update
  • d - Delete

CDC with Kafka Connect

Kafka Connector Setup

  1. Download the YugabyteDB Connector:
wget https://github.com/yugabyte/debezium-connector-yugabytedb/releases/download/v1.9.5.y.19/debezium-connector-yugabytedb-1.9.5.y.19.jar
  1. Configure Kafka Connect:
# connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
plugin.path=/path/to/connector/jar
  1. Start the Connector:
connect-standalone connect-standalone.properties yugabytedb-connector.properties

Monitoring and Observability

Catalog Views

-- View all publications
SELECT * FROM pg_publication;

-- View publication-table mappings
SELECT * FROM pg_publication_tables;

-- View replication slots
SELECT slot_name, plugin, database, active, yb_stream_id 
FROM pg_replication_slots;

Monitoring Replication Lag

Track the confirmed_flush_lsn to monitor consumer progress:
SELECT slot_name, 
       pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn) as lag_bytes
FROM pg_replication_slots
WHERE active = true;

Use Cases

Microservice Event Streaming

Stream table changes to Kafka topics consumed by microservices:
CREATE PUBLICATION orders_events FOR TABLE orders;
CREATE_REPLICATION_SLOT orders_slot LOGICAL yboutput;

Data Warehouse Integration

Replicate operational data to analytics systems:
CREATE PUBLICATION warehouse_replication FOR ALL TABLES;

Cache Invalidation

Invalidate application caches based on database changes:
CREATE PUBLICATION cache_invalidation 
  FOR TABLE products, inventory;

Audit and Compliance

Capture all changes for audit trails:
ALTER TABLE sensitive_data REPLICA IDENTITY FULL;
CREATE PUBLICATION audit_log FOR TABLE sensitive_data;

Best Practices

  1. Choose Appropriate Replica Identity: Use CHANGE for efficiency, FULL when complete history is needed
  2. Monitor Slot Lag: Regularly check replication slot lag to prevent WAL accumulation
  3. Set Retention Policies: Configure ysql_cdc_active_replication_slot_window_ms appropriately
  4. Use Snapshots Wisely: For large tables, consider snapshot.mode=never after initial load
  5. Handle Schema Changes: Plan for DDL changes; some require recreating replication slots
  6. Secure Credentials: Use dedicated replication users with minimal privileges
  7. Partition Publications: Create separate publications for different use cases

Troubleshooting

Slot Not Advancing

Check if connector is consuming:
SELECT * FROM pg_replication_slots WHERE active = false;

Missing Changes

Verify publication includes the table:
SELECT * FROM pg_publication_tables WHERE tablename = 'your_table';

WAL Growing

Drop inactive slots:
SELECT pg_drop_replication_slot('slot_name');

Learn More

Build docs developers (and LLMs) love