Overview
SUBSCRIBE streams updates from sources, tables, views, or materialized views as they occur in real-time. Unlike SELECT which returns results at a single point in time, SUBSCRIBE continuously emits changes.
Syntax
SUBSCRIBE [TO] <object_name | (SELECT ...)>
[ENVELOPE { UPSERT (KEY (<column> [, ...])) | DEBEZIUM (KEY (<column> [, ...])) }]
[WITHIN TIMESTAMP ORDER BY <column> [ASC | DESC] [, ...]]
[WITH (SNAPSHOT = {true | false}, PROGRESS = {true | false})]
[AS OF [AT LEAST] <timestamp>]
[UP TO <timestamp>];
Key Concepts
Streaming Updates
SUBSCRIBE emits a continuous stream of updates:
-- Subscribe to a view
SUBSCRIBE TO user_counts;
Output includes:
mz_timestamp - Logical timestamp of the update
mz_diff - Change type (+1 for insert, -1 for delete)
- Data columns from the subscribed object
Use Cases
- Event Processing: React to every change in real-time
- Data Replication: Replicate complete change history
- Audit Logs: Track all data modifications
- Streaming ETL: Feed changes to downstream systems
- Real-time Dashboards: Update UI components as data changes
Basic SUBSCRIBE
Subscribe to a View
CREATE VIEW order_counts AS
SELECT customer_id, COUNT(*) as order_count
FROM orders
GROUP BY customer_id;
SUBSCRIBE TO order_counts;
Output:
mz_timestamp | mz_diff | customer_id | order_count
-------------|---------|-------------|-------------
1000 | 1 | 123 | 5
1001 | -1 | 123 | 5
1001 | 1 | 123 | 6
1002 | 1 | 456 | 1
Subscribe to a Query
SUBSCRIBE (
SELECT product_id, SUM(quantity) as total_sold
FROM order_items
WHERE order_date >= '2024-01-01'
GROUP BY product_id
);
WITH Options
SNAPSHOT Option
Control initial snapshot emission:
-- With snapshot (default)
SUBSCRIBE TO my_view WITH (SNAPSHOT = true);
-- Without snapshot (only future updates)
SUBSCRIBE TO my_view WITH (SNAPSHOT = false);
Whether to emit current state before streaming updates. Set to false to only see new changes.
PROGRESS Option
Include progress messages:
SUBSCRIBE TO my_view WITH (PROGRESS = true);
Adds mz_progressed column:
false - Regular data row
true - Progress message (timestamps have advanced)
Whether to include progress messages indicating timestamp advancement
Time Bounds
AS OF
Start subscription from a specific timestamp:
-- Start from specific time
SUBSCRIBE TO orders AS OF 1234567890;
-- Start from at least this time
SUBSCRIBE TO orders AS OF AT LEAST now();
Requires history retention to be configured on the source objects.
UP TO
End subscription at a specific timestamp:
-- Subscribe until timestamp
SUBSCRIBE TO orders UP TO 1234567999;
-- Subscribe for time range
SUBSCRIBE TO orders
AS OF 1234567890
UP TO 1234567999;
Envelopes
Upsert Envelope
Transform updates into upsert format:
SUBSCRIBE TO users
ENVELOPE UPSERT (KEY (user_id));
Output:
mz_timestamp | mz_state | user_id | name | email
-------------|----------|---------|------|------
100 | upsert | 1 | Alice | [email protected]
200 | upsert | 1 | Alice | [email protected]
300 | delete | 1 | NULL | NULL
States:
upsert - Insert or update
delete - Delete (values are NULL)
key_violation - Multiple values for same key
Debezium Envelope
Provide before/after states:
SUBSCRIBE TO orders
ENVELOPE DEBEZIUM (KEY (order_id));
Output:
mz_timestamp | mz_state | order_id | before_amount | after_amount
-------------|----------|----------|---------------|-------------
100 | insert | 1 | NULL | 100
200 | upsert | 1 | 100 | 150
300 | delete | 1 | 150 | NULL
Usage Patterns
Using DECLARE and FETCH
Recommended approach for consuming subscriptions:
BEGIN;
DECLARE c CURSOR FOR SUBSCRIBE TO my_view;
-- Fetch rows as they arrive
FETCH 100 c;
FETCH 100 c;
-- Fetch with timeout
FETCH ALL c WITH (timeout = '1s');
-- Fetch without waiting
FETCH ALL c WITH (timeout = '0s');
Using COPY (for psql)
COPY (SUBSCRIBE TO my_view) TO STDOUT;
Examples
Real-Time Metrics
CREATE MATERIALIZED VIEW minute_metrics AS
SELECT
date_trunc('minute', event_time) as minute,
event_type,
COUNT(*) as event_count
FROM events
GROUP BY minute, event_type;
-- Stream updates
SUBSCRIBE TO minute_metrics;
Change Data Capture
-- Subscribe to table changes
SUBSCRIBE TO user_profiles
ENVELOPE DEBEZIUM (KEY (user_id))
WITH (PROGRESS = true);
Top Products
SUBSCRIBE (
SELECT
product_id,
product_name,
SUM(quantity) as total_sold
FROM order_items
JOIN products USING (product_id)
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY product_id, product_name
ORDER BY total_sold DESC
LIMIT 10
);
Filtered Updates
-- Only subscribe to high-value orders
SUBSCRIBE (
SELECT *
FROM orders
WHERE amount > 1000
) WITH (SNAPSHOT = false);
Time-Bounded Subscription
-- Subscribe to updates for one hour
SUBSCRIBE TO metrics
AS OF now()
UP TO now() + INTERVAL '1 hour';
Durable Subscriptions
Resume subscriptions after disconnection:
-- Configure history retention
ALTER MATERIALIZED VIEW my_view
SET (RETAIN HISTORY FOR '24 hours');
-- Subscribe with AS OF to resume
SUBSCRIBE TO my_view
AS OF <last_seen_timestamp>;
Ordering Within Timestamps
SUBSCRIBE TO orders
WITHIN TIMESTAMP ORDER BY customer_id, order_id;
Orders rows within each timestamp by specified columns.
Best Practices
-
Use DECLARE/FETCH
BEGIN;
DECLARE c CURSOR FOR SUBSCRIBE TO my_view;
FETCH ALL c WITH (timeout = '5s');
-
Set Timeouts
FETCH ALL c WITH (timeout = '1s');
-
Enable PROGRESS for Long-Running
SUBSCRIBE TO my_view WITH (PROGRESS = true);
-
Configure History Retention
ALTER MATERIALIZED VIEW my_view
SET (RETAIN HISTORY FOR '1 day');
-
Use Envelopes for Stateful Consumers
SUBSCRIBE TO users
ENVELOPE UPSERT (KEY (user_id));
-
Handle Progress Messages
-- Filter out progress rows
WHERE mz_progressed = false
Monitoring
Check subscription progress:
-- View active subscriptions
SELECT * FROM mz_internal.mz_subscriptions;
-- Check source progress
SELECT * FROM my_source_progress;
Related Pages