Skip to main content

Overview

The PostgreSQL Realtime Monitor uses logical replication to stream database changes in real-time. This approach captures row-level changes (INSERT, UPDATE, DELETE) with actual data values, enabling the application to react to database modifications as they happen.

PostgreSQL Configuration

Logical Replication Setup

The database must be configured with specific settings to enable logical replication (docker-compose.yml:14-18):
command: >
  postgres
    -c wal_level=logical
    -c max_wal_senders=10
    -c max_replication_slots=10
    -c max_connections=100

Configuration Parameters

Write-Ahead Log LevelControls the amount of information written to the WAL (Write-Ahead Log):
  • minimal: Basic crash recovery only
  • replica: Physical replication support
  • logical: Full logical decoding support (required for subscriptions)
Setting wal_level=logical enables row-level change tracking with complete data values.
Maximum WAL Sender ProcessesLimits the number of concurrent replication connections. Each subscription uses one WAL sender slot.
  • Default: 10 (sufficient for most applications)
  • Increase if you need more concurrent subscriptions
Maximum Replication SlotsReplication slots ensure WAL files are retained until subscribers consume them, preventing data loss during disconnections.
  • Each subscription creates a replication slot
  • Slots persist across restarts
  • Unused slots should be cleaned up to prevent WAL bloat
Logical replication increases WAL disk usage. Monitor disk space and configure WAL retention policies appropriately.

Publication Setup

Publications define which tables can be subscribed to (init.sql:1):
CREATE PUBLICATION alltables FOR ALL TABLES;
This publication includes all tables in the database. In production, you may want to create targeted publications for specific tables:
CREATE PUBLICATION user_changes FOR TABLE users, profiles;

Publication Options

Publications can filter which operations are replicated:
-- Only track inserts and updates
CREATE PUBLICATION insert_update_only 
  FOR ALL TABLES 
  WITH (publish = 'insert,update');

-- Track specific tables with all operations
CREATE PUBLICATION orders_pub 
  FOR TABLE orders, order_items;

postgres.js Subscription

The application uses the postgres.js library to subscribe to database changes.

Database Connection (db.ts:9-16)

export const sql = postgres({
    host: DB_HOST,
    port: DB_PORT,
    database: DB_NAME,
    username: DB_USER,
    password: DB_PASSWORD,
    publications: 'alltables', // <- key
})
The publications parameter tells postgres.js which publication to subscribe to. This must match the publication name created in PostgreSQL.

Subscription Setup (src/index.ts:87-124)

The server subscribes to all database changes using the sql.subscribe() method:
const { unsubscribe } = await sql.subscribe(
  "*", // pattern: all (all operations and tables)
  (row, { command, relation }) => {
    // Format the relation (can be object or string)
    const tableName =
      typeof relation === "string"
        ? relation
        : relation?.table
          ? `${relation.schema || "public"}.${relation.table}`
          : JSON.stringify(relation);

    // Create an object that combines operation, table and row data
    const rows = Array.isArray(row) ? row : [row];
    const newChanges = rows.map((r) => ({
      operation: command.toUpperCase(),
      table: tableName,
      ...r,
    }));

    // Add new changes to the accumulative array
    allChanges.push(...newChanges);

    // Send to all WebSocket clients
    broadcast({
      type: "change",
      data: newChanges,
      total: allChanges.length,
    });

    // Also log to console
    console.log(
      `📊 Change detected: ${command.toUpperCase()} on ${tableName}`
    );
  },
  () => {
    console.log("✅ Realtime subscription ready (connected or reconnected)");
  }
);

Subscription Parameters

Pattern ("*")

The first parameter is a filter pattern:
  • "*": Subscribe to all operations on all tables
  • "users": Subscribe to changes on the users table only
  • "public.*": Subscribe to all tables in the public schema

Callback Function

The second parameter is the callback invoked for each change:
(row, { command, relation }) => {
  // row: The changed row data
  // command: 'insert', 'update', or 'delete'
  // relation: Table information (schema + table name)
}
Callback Parameters:
  • row: Object or array containing the row data
  • command: The SQL operation type (insert, update, delete)
  • relation: Table metadata
    • Can be a string (simple table name)
    • Can be an object with schema and table properties

Ready Callback

The third parameter is called when the subscription is established:
() => {
  console.log("✅ Realtime subscription ready (connected or reconnected)");
}
This callback fires both on initial connection and after reconnection following a disconnection.

Change Processing

Relation Formatting (src/index.ts:91-96)

The relation object is normalized to a consistent string format:
const tableName =
  typeof relation === "string"
    ? relation
    : relation?.table
      ? `${relation.schema || "public"}.${relation.table}`
      : JSON.stringify(relation);
This produces table names like:
  • "public.users"
  • "inventory.products"
  • "orders" (if schema is unspecified)

Row Array Handling (src/index.ts:99-104)

Changes are converted to a consistent array format:
const rows = Array.isArray(row) ? row : [row];
const newChanges = rows.map((r) => ({
  operation: command.toUpperCase(),
  table: tableName,
  ...r,
}));
While most changes affect a single row, bulk operations may produce multiple rows in a single callback invocation.

Change Event Structure

INSERT Events

When a row is inserted:
INSERT INTO users (id, name, email) VALUES (1, 'Alice', '[email protected]');
The subscription receives:
{
  row: {
    id: 1,
    name: 'Alice',
    email: '[email protected]'
  },
  command: 'insert',
  relation: { schema: 'public', table: 'users' }
}
Formatted change object:
{
  operation: 'INSERT',
  table: 'public.users',
  id: 1,
  name: 'Alice',
  email: '[email protected]'
}

UPDATE Events

When a row is updated:
UPDATE users SET name = 'Alice Smith' WHERE id = 1;
The subscription receives:
{
  row: {
    id: 1,
    name: 'Alice Smith',
    email: '[email protected]'
  },
  command: 'update',
  relation: { schema: 'public', table: 'users' }
}
UPDATE events contain the new values after the update. Old values are not included by default.

DELETE Events

When a row is deleted:
DELETE FROM users WHERE id = 1;
The subscription receives:
{
  row: {
    id: 1
  },
  command: 'delete',
  relation: { schema: 'public', table: 'users' }
}
DELETE events may have limited data depending on the table’s REPLICA IDENTITY setting:
  • DEFAULT: Only primary key columns
  • FULL: All columns
  • USING INDEX: Columns in specified index
  • NOTHING: No data (not recommended for subscriptions)
To receive full row data on DELETE:
ALTER TABLE users REPLICA IDENTITY FULL;

Replica Identity

The REPLICA IDENTITY setting controls what information is included in change events.

Setting Replica Identity

-- Only include primary key (default)
ALTER TABLE users REPLICA IDENTITY DEFAULT;

-- Include all columns
ALTER TABLE users REPLICA IDENTITY FULL;

-- Use a specific unique index
ALTER TABLE users REPLICA IDENTITY USING INDEX users_email_idx;

-- No identifying information (not recommended)
ALTER TABLE users REPLICA IDENTITY NOTHING;

Impact on Events

Primary Key Only
// UPDATE event
{ id: 1, name: 'Alice Smith' }  // ✓ All new values

// DELETE event
{ id: 1 }  // ✗ Only primary key
For real-time monitoring applications, REPLICA IDENTITY FULL is recommended to ensure DELETE events include complete row data.

Connection Management

Automatic Reconnection

The postgres.js library automatically handles connection loss and reconnection:
  • Maintains a replication slot to prevent data loss
  • Resumes from the last processed position
  • Invokes the ready callback on reconnection

Unsubscribing

The sql.subscribe() method returns an unsubscribe function:
const { unsubscribe } = await sql.subscribe(...);

// Later, to stop the subscription:
await unsubscribe();
Always call unsubscribe() when shutting down to properly clean up the replication slot and prevent WAL accumulation.

Performance Considerations

Write Amplification

  • Every database write generates WAL entries
  • Logical replication adds minimal overhead (~5-10%)
  • Impact increases with number of concurrent subscriptions

Network Traffic

  • Changes are streamed in near real-time
  • High-frequency updates generate proportional network traffic
  • Consider batching or throttling for high-volume tables

Memory Usage

  • Replication slots retain WAL files until consumed
  • Disconnected subscriptions can cause WAL bloat
  • Monitor pg_replication_slots for inactive slots

Monitoring Replication Slots

Check active replication slots:
SELECT 
  slot_name,
  active,
  restart_lsn,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;

Limitations

Logical replication only captures DML operations (INSERT, UPDATE, DELETE). Schema changes (CREATE, ALTER, DROP) are not included in change events.
TRUNCATE operations may or may not be replicated depending on publication settings. Use DELETE for reliable change tracking.
Sequence operations (NEXTVAL, SETVAL) are not replicated. Applications should not rely on sequence values being synchronized.
PostgreSQL large objects (BLOBs) are not supported by logical replication. Store binary data in BYTEA columns instead.

Best Practices

Set Replica Identity

Use REPLICA IDENTITY FULL for tables where you need complete DELETE event data

Monitor WAL Growth

Regularly check replication slot lag and clean up inactive slots

Selective Publications

Create targeted publications instead of subscribing to all tables

Handle Reconnections

Implement proper error handling for subscription reconnections

Debugging

Enable Verbose Logging

Add logging to the subscription callback:
await sql.subscribe(
  "*",
  (row, { command, relation }) => {
    console.log('Change received:', {
      command,
      relation,
      row,
      timestamp: new Date().toISOString()
    });
    // ... rest of callback
  }
);

Check Publication Status

-- List all publications
SELECT * FROM pg_publication;

-- Show tables in a publication
SELECT * FROM pg_publication_tables WHERE pubname = 'alltables';

Verify Replication Connection

-- Show active replication connections
SELECT * FROM pg_stat_replication;

Architecture Overview

Understand how database subscription fits into the system

WebSocket Implementation

Learn how changes are broadcast to clients

Build docs developers (and LLMs) love