Skip to main content
SUBSCRIBE streams updates from a source, table, view, or materialized view as they occur. Use it to power event-driven applications and replicate data in real-time.

Overview

The SUBSCRIBE statement is a more general form of SELECT. While SELECT computes a relation at a moment in time, SUBSCRIBE computes how a relation changes over time. SUBSCRIBE produces a sequence of updates that describe insertions, deletions, and modifications to a relation. This allows you to:
  • Power event processors that react to every change
  • Replicate the complete history of a relation
  • Build real-time streaming applications

Basic Syntax

SUBSCRIBE [TO] object_name | (SELECT ...)
[ENVELOPE UPSERT (KEY (key1, ...)) | ENVELOPE DEBEZIUM (KEY (key1, ...))]
[WITH (option_name [= option_value], ...)]
[AS OF [AT LEAST] timestamp_expression]
[UP TO timestamp_expression]

Simple Example

Subscribe to a materialized view:
BEGIN;
DECLARE c CURSOR FOR SUBSCRIBE my_view;
FETCH ALL c WITH (timeout='1s');
COMMIT;

Output Format

SUBSCRIBE emits rows with metadata columns prepended:
ColumnTypeDescription
mz_timestampnumericMaterialize’s internal logical timestamp
mz_diffbigintChange in frequency: positive for insertions, negative for deletions
mz_progressedboolean(When PROGRESS option used) Indicates no more updates at earlier timestamps
Columns 1-NVariesData columns from the subscribed relation

Example Output

mz_timestamp | mz_diff | id | name      | total
-------------|---------|----|-----------|---------
1648737001490|    1    | 1  | Product A | 100
1648737001490|    1    | 2  | Product B | 200
1648737065479|   -1    | 1  | Product A | 100
1648737065479|    1    | 1  | Product A | 150
Interpretation:
  • mz_diff = 1: Row inserted
  • mz_diff = -1: Row deleted
  • Update = deletion followed by insertion at same timestamp

SUBSCRIBE Options

SNAPSHOT

Controls whether to emit the current state before streaming changes:
-- Include snapshot (default)
SUBSCRIBE my_view;

-- Skip snapshot, only stream future changes
SUBSCRIBE my_view WITH (SNAPSHOT = false);

PROGRESS

Include progress messages to detect periods with no updates:
SUBSCRIBE my_view WITH (PROGRESS);
Progress rows have mz_progressed = true and indicate that no updates will arrive for earlier timestamps.

Envelope Options

Envelopes control how changes are formatted in the output.

ENVELOPE UPSERT

Reformat output as insert/update/delete operations:
SUBSCRIBE my_view ENVELOPE UPSERT (KEY (id));
Output includes mz_state column:
mz_timestamp | mz_state | id | value
-------------|----------|----|---------
100          | upsert   | 1  | 10
200          | upsert   | 1  | 20      -- Update
300          | delete   | 1  | NULL    -- Delete
States:
  • upsert: Insert or update operation
  • delete: Delete operation
  • key_violation: Multiple values detected for same key

ENVELOPE DEBEZIUM

Include before and after values for changes:
SUBSCRIBE my_view ENVELOPE DEBEZIUM (KEY (id));
Output includes before and after columns:
mz_timestamp | mz_state | id | before_value | after_value
-------------|----------|----|--------------|---------
100          | insert   | 1  | NULL         | 10
200          | upsert   | 1  | 10           | 20
300          | delete   | 1  | 20           | NULL

Time Bounds

AS OF

Start subscribing from a specific timestamp:
SUBSCRIBE my_view AS OF 1648737000000;
Requires configuring history retention period.

UP TO

Stop subscribing at a specific timestamp:
SUBSCRIBE my_view UP TO 1648737999999;

Combined Time Bounds

SUBSCRIBE my_view 
  AS OF 1648737000000 
  UP TO 1648737999999;

Client Examples

Python (psycopg2)

Stream updates using cursor-based fetching:
import psycopg2

dsn = "user=MATERIALIZE_USERNAME password=MATERIALIZE_PASSWORD host=MATERIALIZE_HOST port=6875 dbname=materialize sslmode=require"
conn = psycopg2.connect(dsn)

with conn.cursor() as cur:
    cur.execute("DECLARE c CURSOR FOR SUBSCRIBE my_view")
    while True:
        cur.execute("FETCH ALL c")
        for row in cur:
            timestamp, diff, *values = row
            if diff == 1:
                print(f"Insert: {values}")
            elif diff == -1:
                print(f"Delete: {values}")

Python (psycopg3)

Psycopg3 supports streaming without buffering:
import psycopg

dsn = "user=MATERIALIZE_USERNAME password=MATERIALIZE_PASSWORD host=MATERIALIZE_HOST port=6875 dbname=materialize sslmode=require"
conn = psycopg.connect(dsn)

with conn.cursor() as cur:
    for row in cur.stream("SUBSCRIBE my_view"):
        print(row)

Node.js

Stream updates using node-postgres:
const { Client } = require('pg');

const client = new Client({
    user: MATERIALIZE_USERNAME,
    password: MATERIALIZE_PASSWORD,
    host: MATERIALIZE_HOST,
    port: 6875,
    database: 'materialize',
    ssl: true
});

async function main() {
    await client.connect();
    await client.query('BEGIN');
    await client.query('DECLARE c CURSOR FOR SUBSCRIBE my_view WITH (SNAPSHOT = FALSE)');

    while (true) {
        const res = await client.query('FETCH ALL c');
        for (const row of res.rows) {
            const { mz_timestamp, mz_diff, ...data } = row;
            if (mz_diff === '1') {
                console.log('Insert:', data);
            } else if (mz_diff === '-1') {
                console.log('Delete:', data);
            }
        }
    }
}

main();

Java (JDBC)

Stream updates using JDBC:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;

public class App {
    private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize";
    private final String user = "MATERIALIZE_USERNAME";
    private final String password = "MATERIALIZE_PASSWORD";

    public Connection connect() throws SQLException {
        Properties props = new Properties();
        props.setProperty("user", user);
        props.setProperty("password", password);
        props.setProperty("ssl", "true");
        return DriverManager.getConnection(url, props);
    }

    public void subscribe() {
        try (Connection conn = connect()) {
            Statement stmt = conn.createStatement();
            stmt.execute("BEGIN");
            stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE my_view");
            
            while (true) {
                ResultSet rs = stmt.executeQuery("FETCH ALL c");
                while (rs.next()) {
                    String timestamp = rs.getString(1);
                    int diff = rs.getInt(2);
                    // Process remaining columns...
                    System.out.println(timestamp + " " + diff);
                }
            }
        } catch (SQLException ex) {
            System.out.println(ex.getMessage());
        }
    }
}

psql with COPY

For interactive sessions, wrap SUBSCRIBE in COPY:
COPY (SUBSCRIBE my_view) TO STDOUT;

Handling Updates

Map rows to their corresponding updates:
from collections import defaultdict

# Track current state by key
state = defaultdict(dict)

with conn.cursor() as cur:
    cur.execute("DECLARE c CURSOR FOR SUBSCRIBE my_view")
    while True:
        cur.execute("FETCH ALL c")
        for row in cur:
            timestamp, diff, key, value = row
            
            if diff == 1:
                # Insert or update
                state[key] = value
                print(f"Key {key} now has value {value}")
            elif diff == -1:
                # Delete
                if key in state:
                    del state[key]
                    print(f"Key {key} deleted")

Durable Subscriptions

For production systems, configure history retention to resume subscriptions after disconnections:
-- Configure retention on the materialized view
ALTER MATERIALIZED VIEW my_view SET (RETAIN HISTORY FOR '1d');

-- Resume from a saved timestamp
SUBSCRIBE my_view AS OF 1648737000000;
See Durable Subscriptions for details.

Performance Tips

Use Materialized Views

Subscribe to materialized views instead of indexed views for better performance:
CREATE MATERIALIZED VIEW my_results AS
SELECT region, SUM(total) as revenue
FROM orders
GROUP BY region;

SUBSCRIBE my_results;

Dedicated Clusters

Run subscriptions on dedicated clusters:
CREATE CLUSTER subscribe_cluster SIZE = 'medium';
SET cluster = subscribe_cluster;

Batch Fetches

Fetch results in batches with timeouts:
FETCH 100 c WITH (timeout='5s');

Common Patterns

Event Processing

React to every change in a relation:
with conn.cursor() as cur:
    cur.execute("DECLARE c CURSOR FOR SUBSCRIBE orders WITH (SNAPSHOT = false)")
    while True:
        cur.execute("FETCH ALL c WITH (timeout='10s')")
        for row in cur:
            timestamp, diff, order_id, status, amount = row
            if diff == 1 and status == 'completed':
                # Process completed order
                send_notification(order_id, amount)

Data Replication

Replicate data to another system:
with conn.cursor() as cur:
    cur.execute("DECLARE c CURSOR FOR SUBSCRIBE my_view")
    while True:
        cur.execute("FETCH 1000 c")
        batch = cur.fetchall()
        if batch:
            write_to_destination(batch)

Change Data Capture

Capture and log all changes:
import json

with conn.cursor() as cur:
    cur.execute(
        "DECLARE c CURSOR FOR SUBSCRIBE my_view "
        "ENVELOPE DEBEZIUM (KEY (id)) WITH (PROGRESS)"
    )
    while True:
        cur.execute("FETCH ALL c WITH (timeout='1s')")
        for row in cur:
            change_event = {
                'timestamp': str(row[0]),
                'state': row[1],
                'id': row[2],
                'before': row[3],
                'after': row[4]
            }
            log_change(json.dumps(change_event))

Troubleshooting

Driver Buffering

Many PostgreSQL drivers buffer results until a query completes. Since SUBSCRIBE can run forever, use:
  • FETCH to retrieve results in batches
  • AS OF and UP TO to bound the subscription
  • Driver-specific streaming APIs (e.g., psycopg3’s stream())

Connection Drops

Handle connection failures gracefully:
import time

last_timestamp = None

while True:
    try:
        conn = psycopg2.connect(dsn)
        with conn.cursor() as cur:
            if last_timestamp:
                # Resume from last seen timestamp
                cur.execute(
                    f"DECLARE c CURSOR FOR SUBSCRIBE my_view "
                    f"AS OF {last_timestamp}"
                )
            else:
                cur.execute("DECLARE c CURSOR FOR SUBSCRIBE my_view")
            
            while True:
                cur.execute("FETCH ALL c WITH (timeout='10s')")
                for row in cur:
                    last_timestamp = row[0]
                    process_row(row)
    except Exception as e:
        print(f"Connection error: {e}")
        time.sleep(5)  # Wait before reconnecting

Next Steps

Query Results

Query point-in-time results with SELECT

Kafka Sinks

Write changes to Kafka topics

SQL Reference

Complete SUBSCRIBE reference

Durable Subscriptions

Configure history retention

Build docs developers (and LLMs) love