Skip to main content

Overview

SUBSCRIBE streams updates from sources, tables, views, or materialized views as they occur in real-time. Unlike SELECT which returns results at a single point in time, SUBSCRIBE continuously emits changes.

Syntax

SUBSCRIBE [TO] <object_name | (SELECT ...)>
  [ENVELOPE { UPSERT (KEY (<column> [, ...])) | DEBEZIUM (KEY (<column> [, ...])) }]
  [WITHIN TIMESTAMP ORDER BY <column> [ASC | DESC] [, ...]]
  [WITH (SNAPSHOT = {true | false}, PROGRESS = {true | false})]
  [AS OF [AT LEAST] <timestamp>]
  [UP TO <timestamp>];

Key Concepts

Streaming Updates

SUBSCRIBE emits a continuous stream of updates:
-- Subscribe to a view
SUBSCRIBE TO user_counts;
Output includes:
  • mz_timestamp - Logical timestamp of the update
  • mz_diff - Change type (+1 for insert, -1 for delete)
  • Data columns from the subscribed object

Use Cases

  • Event Processing: React to every change in real-time
  • Data Replication: Replicate complete change history
  • Audit Logs: Track all data modifications
  • Streaming ETL: Feed changes to downstream systems
  • Real-time Dashboards: Update UI components as data changes

Basic SUBSCRIBE

Subscribe to a View

CREATE VIEW order_counts AS
  SELECT customer_id, COUNT(*) as order_count
  FROM orders
  GROUP BY customer_id;

SUBSCRIBE TO order_counts;
Output:
mz_timestamp | mz_diff | customer_id | order_count
-------------|---------|-------------|-------------
 1000        | 1       | 123         | 5
 1001        | -1      | 123         | 5
 1001        | 1       | 123         | 6
 1002        | 1       | 456         | 1

Subscribe to a Query

SUBSCRIBE (
  SELECT product_id, SUM(quantity) as total_sold
  FROM order_items
  WHERE order_date >= '2024-01-01'
  GROUP BY product_id
);

WITH Options

SNAPSHOT Option

Control initial snapshot emission:
-- With snapshot (default)
SUBSCRIBE TO my_view WITH (SNAPSHOT = true);

-- Without snapshot (only future updates)
SUBSCRIBE TO my_view WITH (SNAPSHOT = false);
SNAPSHOT
boolean
default:"true"
Whether to emit current state before streaming updates. Set to false to only see new changes.

PROGRESS Option

Include progress messages:
SUBSCRIBE TO my_view WITH (PROGRESS = true);
Adds mz_progressed column:
  • false - Regular data row
  • true - Progress message (timestamps have advanced)
PROGRESS
boolean
default:"false"
Whether to include progress messages indicating timestamp advancement

Time Bounds

AS OF

Start subscription from a specific timestamp:
-- Start from specific time
SUBSCRIBE TO orders AS OF 1234567890;

-- Start from at least this time
SUBSCRIBE TO orders AS OF AT LEAST now();
Requires history retention to be configured on the source objects.

UP TO

End subscription at a specific timestamp:
-- Subscribe until timestamp
SUBSCRIBE TO orders UP TO 1234567999;

-- Subscribe for time range
SUBSCRIBE TO orders
  AS OF 1234567890
  UP TO 1234567999;

Envelopes

Upsert Envelope

Transform updates into upsert format:
SUBSCRIBE TO users
  ENVELOPE UPSERT (KEY (user_id));
Output:
mz_timestamp | mz_state | user_id | name | email
-------------|----------|---------|------|------
 100         | upsert   | 1       | Alice | [email protected]
 200         | upsert   | 1       | Alice | [email protected]
 300         | delete   | 1       | NULL  | NULL
States:
  • upsert - Insert or update
  • delete - Delete (values are NULL)
  • key_violation - Multiple values for same key

Debezium Envelope

Provide before/after states:
SUBSCRIBE TO orders
  ENVELOPE DEBEZIUM (KEY (order_id));
Output:
mz_timestamp | mz_state | order_id | before_amount | after_amount
-------------|----------|----------|---------------|-------------
 100         | insert   | 1        | NULL          | 100
 200         | upsert   | 1        | 100           | 150
 300         | delete   | 1        | 150           | NULL

Usage Patterns

Using DECLARE and FETCH

Recommended approach for consuming subscriptions:
BEGIN;
DECLARE c CURSOR FOR SUBSCRIBE TO my_view;

-- Fetch rows as they arrive
FETCH 100 c;
FETCH 100 c;

-- Fetch with timeout
FETCH ALL c WITH (timeout = '1s');

-- Fetch without waiting
FETCH ALL c WITH (timeout = '0s');

Using COPY (for psql)

COPY (SUBSCRIBE TO my_view) TO STDOUT;

Examples

Real-Time Metrics

CREATE MATERIALIZED VIEW minute_metrics AS
  SELECT
    date_trunc('minute', event_time) as minute,
    event_type,
    COUNT(*) as event_count
  FROM events
  GROUP BY minute, event_type;

-- Stream updates
SUBSCRIBE TO minute_metrics;

Change Data Capture

-- Subscribe to table changes
SUBSCRIBE TO user_profiles
  ENVELOPE DEBEZIUM (KEY (user_id))
  WITH (PROGRESS = true);

Top Products

SUBSCRIBE (
  SELECT
    product_id,
    product_name,
    SUM(quantity) as total_sold
  FROM order_items
  JOIN products USING (product_id)
  WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'
  GROUP BY product_id, product_name
  ORDER BY total_sold DESC
  LIMIT 10
);

Filtered Updates

-- Only subscribe to high-value orders
SUBSCRIBE (
  SELECT *
  FROM orders
  WHERE amount > 1000
) WITH (SNAPSHOT = false);

Time-Bounded Subscription

-- Subscribe to updates for one hour
SUBSCRIBE TO metrics
  AS OF now()
  UP TO now() + INTERVAL '1 hour';

Durable Subscriptions

Resume subscriptions after disconnection:
-- Configure history retention
ALTER MATERIALIZED VIEW my_view
SET (RETAIN HISTORY FOR '24 hours');

-- Subscribe with AS OF to resume
SUBSCRIBE TO my_view
  AS OF <last_seen_timestamp>;

Ordering Within Timestamps

SUBSCRIBE TO orders
  WITHIN TIMESTAMP ORDER BY customer_id, order_id;
Orders rows within each timestamp by specified columns.

Best Practices

  1. Use DECLARE/FETCH
    BEGIN;
    DECLARE c CURSOR FOR SUBSCRIBE TO my_view;
    FETCH ALL c WITH (timeout = '5s');
    
  2. Set Timeouts
    FETCH ALL c WITH (timeout = '1s');
    
  3. Enable PROGRESS for Long-Running
    SUBSCRIBE TO my_view WITH (PROGRESS = true);
    
  4. Configure History Retention
    ALTER MATERIALIZED VIEW my_view
    SET (RETAIN HISTORY FOR '1 day');
    
  5. Use Envelopes for Stateful Consumers
    SUBSCRIBE TO users
    ENVELOPE UPSERT (KEY (user_id));
    
  6. Handle Progress Messages
    -- Filter out progress rows
    WHERE mz_progressed = false
    

Monitoring

Check subscription progress:
-- View active subscriptions
SELECT * FROM mz_internal.mz_subscriptions;

-- Check source progress
SELECT * FROM my_source_progress;

Build docs developers (and LLMs) love