Overview
Sinks are the inverse of sources — they describe external systems where Materialize writes data. Sinks continuously stream changes from materialized views , sources , or tables to external destinations as soon as changes occur.
-- Create a materialized view
CREATE MATERIALIZED VIEW customer_summary AS
SELECT
customer_id,
COUNT ( * ) as order_count,
SUM (total) as lifetime_value
FROM orders
GROUP BY customer_id;
-- Stream changes to Kafka
CREATE SINK customer_sink
FROM customer_summary
INTO KAFKA CONNECTION kafka_conn (TOPIC 'customer-summary' )
FORMAT JSON ;
Sinks enable push-based architectures where downstream systems receive updates immediately without polling.
Kafka Sinks
Materialize supports streaming data to Kafka and Kafka-compatible systems (like Redpanda).
Creating a Kafka Sink
-- Create a Kafka connection
CREATE SECRET kafka_password AS '...' ;
CREATE CONNECTION kafka_conn TO KAFKA (
BROKER 'broker.kafka.example.com:9092' ,
SASL MECHANISMS = 'PLAIN' ,
SASL USERNAME = 'user' ,
SASL PASSWORD = SECRET kafka_password
);
-- Create a sink from a materialized view
CREATE SINK order_updates_sink
FROM order_updates
INTO KAFKA CONNECTION kafka_conn (TOPIC 'order-updates' )
FORMAT JSON
ENVELOPE UPSERT;
CREATE SINK json_sink
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'events' )
FORMAT JSON
ENVELOPE UPSERT;
Output Example :{
"customer_id" : "CUST-123" ,
"order_count" : 42 ,
"lifetime_value" : 5432.10
}
CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL 'https://schema-registry.example.com'
);
CREATE SINK avro_sink
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'events' )
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
Materialize automatically registers schemas with the schema registry. -- Requires single text or bytea column
CREATE SINK text_sink
FROM text_data
INTO KAFKA CONNECTION kafka_conn (TOPIC 'logs' )
FORMAT TEXT ;
Envelopes
Upsert Envelope (Recommended)
Maintains the latest state for each key: CREATE SINK upsert_sink
FROM customer_summary
INTO KAFKA CONNECTION kafka_conn (TOPIC 'customers' )
FORMAT JSON
ENVELOPE UPSERT;
Message format :
Key : Primary key columns
Value : All columns (or null for deletions)
Use when : Downstream consumers need the current state
Provides full CDC information with before/after images: CREATE SINK debezium_sink
FROM orders
INTO KAFKA CONNECTION kafka_conn (TOPIC 'order-changes' )
FORMAT JSON
ENVELOPE DEBEZIUM;
Message format :{
"before" : { "id" : 1 , "status" : "pending" },
"after" : { "id" : 1 , "status" : "shipped" },
"op" : "u"
}
Use when : Downstream systems need full change history
How Sinks Work
Incremental Updates
Sinks stream only changes , not entire snapshots:
CREATE MATERIALIZED VIEW inventory AS
SELECT
product_id,
SUM (quantity) as stock
FROM inventory_events
GROUP BY product_id;
CREATE SINK inventory_sink
FROM inventory
INTO KAFKA CONNECTION kafka_conn (TOPIC 'inventory' )
FORMAT JSON
ENVELOPE UPSERT;
When a new inventory event arrives:
INSERT INTO inventory_events VALUES ( 'PROD-123' , 10 );
Materialize:
Incrementally updates the inventory materialized view
Computes the delta: (product_id='PROD-123', stock: +10)
Emits one message to Kafka with the new stock value
Only changed rows are sent — not the entire table. This minimizes bandwidth and downstream processing.
Exactly-Once Semantics
Materialize provides exactly-once delivery guarantees:
Uses Kafka transactions to group related updates
Handles network failures and retries automatically
Ensures no duplicate messages in the sink topic
-- Transactional updates in Materialize
BEGIN ;
INSERT INTO orders VALUES ( 1 , 'CUST-1' , 100 );
INSERT INTO orders VALUES ( 2 , 'CUST-1' , 50 );
COMMIT ;
-- Sink emits both changes in a single Kafka transaction
-- Downstream consumers see atomic updates
Timestamp Watermarks
Every message includes a materialize-timestamp header indicating logical time:
{
"headers" : {
"materialize-timestamp" : "1704067200000"
},
"key" : { "customer_id" : "CUST-123" },
"value" : { "customer_id" : "CUST-123" , "order_count" : 42 }
}
This enables downstream systems to:
Track data freshness
Implement time-based processing
Join data with consistent timestamps
Message Keys
By default, sinks use the primary key as the message key:
CREATE MATERIALIZED VIEW customer_summary AS
SELECT
customer_id, -- Primary key
COUNT ( * ) as orders
FROM orders
GROUP BY customer_id;
CREATE SINK customer_sink
FROM customer_summary
INTO KAFKA CONNECTION kafka_conn (TOPIC 'customers' )
FORMAT JSON
ENVELOPE UPSERT;
Kafka message :
Key : {"customer_id": "CUST-123"}
Value : {"customer_id": "CUST-123", "orders": 42}
CREATE SINK mixed_format_sink
FROM customer_summary
INTO KAFKA CONNECTION kafka_conn (TOPIC 'customers' )
KEY FORMAT TEXT
VALUE FORMAT JSON
ENVELOPE UPSERT;
This produces:
Key : "CUST-123" (plain text)
Value : {"customer_id": "CUST-123", "orders": 42} (JSON)
Add custom headers to messages:
CREATE VIEW events_with_headers AS
SELECT
event_id,
event_type,
data ,
MAP['source' => 'materialize', 'version' => '1.0'] as headers
FROM events;
CREATE SINK events_sink
FROM events_with_headers
INTO KAFKA CONNECTION kafka_conn (TOPIC 'events' )
FORMAT JSON
ENVELOPE UPSERT
HEADERS headers;
Headers starting with materialize- are reserved and will be ignored.
Sink from Different Sources
From Materialized Views
-- Most common: sink a materialized view
CREATE MATERIALIZED VIEW daily_revenue AS
SELECT
DATE_TRUNC( 'day' , order_time) as day ,
SUM (total) as revenue
FROM orders
GROUP BY day ;
CREATE SINK revenue_sink
FROM daily_revenue
INTO KAFKA CONNECTION kafka_conn (TOPIC 'revenue' )
FORMAT JSON ;
From Sources
-- Pass through source data unchanged
CREATE SOURCE pg_source
FROM POSTGRES CONNECTION pg_conn
(PUBLICATION 'mz_source' );
CREATE SINK passthrough_sink
FROM pg_source_orders
INTO KAFKA CONNECTION kafka_conn (TOPIC 'orders-replicated' )
FORMAT JSON
ENVELOPE DEBEZIUM;
From Tables
-- Stream table changes
CREATE TABLE config_changes (
change_id INT ,
config_key TEXT ,
config_value TEXT
);
CREATE SINK config_sink
FROM config_changes
INTO KAFKA CONNECTION kafka_conn (TOPIC 'config-updates' )
FORMAT JSON
ENVELOPE UPSERT;
Only materialized views work with sinks. Regular views cannot be used because they don’t maintain incremental state.
Hydration and Initial Load
When a sink is created, it undergoes hydration — loading the complete snapshot:
CREATE SINK customer_sink
FROM customer_summary -- Contains 1M rows
INTO KAFKA CONNECTION kafka_conn (TOPIC 'customers' )
FORMAT JSON
ENVELOPE UPSERT;
Load Snapshot
Materialize loads the entire current state of customer_summary into memory
Emit Messages
All 1M rows are emitted to the Kafka topic
Switch to Incremental
Sink begins streaming only changes
During hydration, sinks need memory proportional to the entire snapshot. Ensure sufficient cluster memory for large datasets.
Clusters and Sinks
Sinks require compute resources and must be associated with a cluster:
-- Create a dedicated cluster for sinks
CREATE CLUSTER sink_cluster SIZE = '50cc' ;
-- Create sink in that cluster
CREATE SINK my_sink
IN CLUSTER sink_cluster
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'events' )
FORMAT JSON ;
Best Practice : Use separate clusters for sources and sinks to avoid resource contention. Sinks can use smaller clusters than sources since they only emit changes.
Three-Tier Architecture with Sinks
Sources cluster : Ingests data
Transform cluster : Materialized views
Serving cluster : Indexes for queries
Sink cluster : Streams to external systems
Monitoring Sinks
Check Sink Status
-- View all sinks
SELECT name , type , size FROM mz_sinks;
-- Check sink health
SELECT
s . name ,
ss . status ,
ss . error
FROM mz_sinks s
JOIN mz_internal . mz_sink_statuses ss ON s . id = ss . id ;
Monitor Write Progress
-- View sink write progress
SELECT
s . name ,
sw . offset ,
sw . timestamp
FROM mz_sinks s
JOIN mz_internal . mz_sink_statistics sw ON s . id = sw . id ;
Track Message Volume
-- Count messages written by sink
SELECT
name ,
SUM (messages_written) as total_messages
FROM mz_internal . mz_sink_statistics
JOIN mz_sinks USING (id)
GROUP BY name ;
Partition by Key
Kafka topic partitioning is determined by the message key:
-- Customer ID becomes partition key
CREATE SINK customer_sink
FROM customer_summary -- Key: customer_id
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'customers' ,
PARTITION COUNT = 10
)
FORMAT JSON
ENVELOPE UPSERT;
Messages for the same customer always go to the same partition, maintaining order.
Batch Size Tuning
Materialize batches messages for efficiency:
CREATE SINK optimized_sink
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'events' )
FORMAT JSON
WITH (
batch_size = 1000 , -- Messages per batch
batch_timeout = '100ms' -- Max wait time
);
Larger batches improve throughput but increase latency. Tune based on your requirements.
Example: Real-Time Data Pipeline
Build a complete streaming pipeline:
-- Ingest from PostgreSQL
CREATE SOURCE pg_source
IN CLUSTER source_cluster
FROM POSTGRES CONNECTION pg_conn
(PUBLICATION 'mz_source' );
-- Transform: Calculate metrics
CREATE MATERIALIZED VIEW product_metrics
IN CLUSTER transform_cluster AS
SELECT
product_id,
COUNT ( DISTINCT customer_id) as unique_customers,
COUNT ( * ) as total_orders,
SUM (quantity) as total_quantity,
SUM (amount) as total_revenue,
AVG (amount) as avg_order_value
FROM pg_source_orders
GROUP BY product_id;
-- Sink: Stream to Kafka for downstream analytics
CREATE SINK product_metrics_sink
IN CLUSTER sink_cluster
FROM product_metrics
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'product-metrics'
)
FORMAT AVRO
USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
-- Sink: Stream to another Kafka topic for alerting
CREATE VIEW high_value_products AS
SELECT * FROM product_metrics
WHERE total_revenue > 100000 ;
CREATE SINK alerts_sink
IN CLUSTER sink_cluster
FROM high_value_products
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'high-value-alerts'
)
FORMAT JSON ;
Limitations and Considerations
Current Limitations :
Kafka only : Materialize currently supports only Kafka sinks (including Redpanda)
Materialized views required : Cannot sink from regular views
Memory requirements : Initial snapshot must fit in cluster memory
No sink pausing : Sinks cannot be paused, only dropped and recreated
Best Practices
Isolate sinks from sources and queries: CREATE CLUSTER sink_cluster SIZE = '50cc' ;
CREATE SINK my_sink
IN CLUSTER sink_cluster
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'events' )
FORMAT JSON ;
When downstream needs current state (not change stream): CREATE SINK state_sink
FROM current_state
INTO KAFKA CONNECTION kafka_conn (TOPIC 'state' )
FORMAT JSON
ENVELOPE UPSERT; -- Consumers get latest values
Use Debezium for Change Logs
When downstream needs full change history: CREATE SINK changelog_sink
FROM orders
INTO KAFKA CONNECTION kafka_conn (TOPIC 'order-changes' )
FORMAT JSON
ENVELOPE DEBEZIUM; -- Includes before/after
Ensure sinks keep up with upstream changes: SELECT
name ,
lag
FROM mz_sinks
JOIN mz_internal . mz_wallclock_global_lag USING (id)
WHERE lag > INTERVAL '1 minute' ;
Next Steps
Configure Clusters Size clusters for sink workloads
Kafka Integration Detailed Kafka sink setup
SQL Reference Complete CREATE SINK syntax
Monitoring Guide Monitor sink performance and health