Overview
CREATE SINK connects Materialize to external systems for streaming data out. Sinks continuously export updates from sources, tables, materialized views, or views to external destinations.
Syntax
CREATE SINK [IF NOT EXISTS] <sink_name>
[IN CLUSTER <cluster_name>]
FROM <source | table | materialized_view>
INTO KAFKA CONNECTION <connection_name> (
TOPIC '<topic_name>',
<kafka_options>
)
[KEY (<column> [, ...])]
FORMAT { AVRO | JSON | TEXT | BYTES }
[ENVELOPE { UPSERT | DEBEZIUM }];
Supported Destinations
Kafka/Redpanda Sinks
Stream data to Kafka or Redpanda topics:
CREATE CONNECTION kafka_conn TO KAFKA (
BROKER 'kafka.example.com:9092',
SASL MECHANISMS = 'PLAIN',
SASL USERNAME = 'user',
SASL PASSWORD = SECRET kafka_password
);
CREATE SINK orders_sink
FROM orders_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'orders')
KEY (order_id)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
Parameters
Name of the sink to create
Name of the source, table, materialized view, or view to export
Name of the Kafka connection to use
Name of the Kafka topic to write to
Cluster where the sink will run. Defaults to the active cluster.
Export data in Avro format with schema registry integration:
CREATE SINK avro_sink
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
KEY (id)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
Features:
- Automatic schema registration
- Column name sanitization
- Type conversion to Avro types
- Schema evolution support
Type Mapping:
integer → Avro int
bigint → Avro long
text → Avro string
boolean → Avro boolean
double precision → Avro double
timestamp → Avro long with timestamp-micros
jsonb → Avro string with io.debezium.data.Json
Export data as JSON objects:
CREATE SINK json_sink
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
KEY (id)
FORMAT JSON
ENVELOPE UPSERT;
Output Example:
Text/Bytes Format
Export single-column data:
CREATE SINK text_sink
FROM single_column_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
FORMAT TEXT
ENVELOPE UPSERT;
Envelopes
Upsert Envelope
Standard key-value semantics for inserts, updates, and deletes:
CREATE SINK upsert_sink
FROM users
INTO KAFKA CONNECTION kafka_conn (TOPIC 'users')
KEY (user_id)
FORMAT JSON
ENVELOPE UPSERT;
Behavior:
- Insert: New key → Emit full row
- Update: Existing key → Emit new row (old row not included)
- Delete: Existing key → Emit tombstone (null value)
Requirements:
- Must specify
KEY with unique columns
- Key must be unique in the source
Debezium Envelope
Debezium-style change events with before/after states:
CREATE SINK debezium_sink
FROM orders
INTO KAFKA CONNECTION kafka_conn (TOPIC 'orders')
FORMAT JSON
ENVELOPE DEBEZIUM;
Output Format:
// Insert
{"before": null, "after": {"id": 1, "amount": 100}}
// Update
{"before": {"id": 1, "amount": 100}, "after": {"id": 1, "amount": 150}}
// Delete
{"before": {"id": 1, "amount": 150}, "after": null}
Topic Configuration
Configure Kafka topic settings:
CREATE SINK configured_sink
FROM my_view
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'my_topic',
TOPIC PARTITION COUNT 4,
TOPIC REPLICATION FACTOR 3,
TOPIC CONFIG MAP[
'cleanup.policy' => 'compact',
'retention.ms' => '86400000'
]
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
Number of partitions for the topic
Replication factor for the topic
Additional Kafka topic configuration
Custom Partitioning
Control how messages are partitioned:
CREATE SINK partitioned_sink
FROM orders
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'orders',
PARTITION BY = seahash(customer_id::text)
)
KEY (order_id)
FORMAT JSON
ENVELOPE UPSERT;
The PARTITION BY expression:
- Must return a
uint8 value
- Determines the partition:
partition = hash % partition_count
- Can reference columns from the source
Available Hash Functions:
seahash() - Default hash function
kafka_murmur2() - Compatible with Kafka’s default partitioner
crc32() - CRC32 hash
Exactly-Once Semantics
Materialize guarantees exactly-once delivery:
-- Automatically uses progress topic for exactly-once
CREATE SINK exactly_once_sink
FROM my_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
KEY (id)
FORMAT JSON
ENVELOPE UPSERT;
Implementation:
- Uses Kafka transactions
- Maintains progress in a dedicated topic
- Recovers from failures without duplicates
Progress Topic:
- Named:
_materialize-progress-{REGION_ID}-{CONNECTION_ID}
- Stores sink checkpoint information
- Required for exactly-once guarantees
Examples
Basic Kafka Sink
CREATE SINK orders_kafka
FROM orders_materialized
INTO KAFKA CONNECTION kafka_conn (TOPIC 'orders')
KEY (order_id)
FORMAT JSON
ENVELOPE UPSERT;
Avro Sink with Schema Registry
CREATE SINK users_avro
FROM users_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'users')
KEY (user_id)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (
KEY COMPATIBILITY LEVEL 'BACKWARD',
VALUE COMPATIBILITY LEVEL 'BACKWARD_TRANSITIVE'
)
ENVELOPE UPSERT;
Sink with Custom Partitioning
CREATE SINK customer_orders
FROM order_details
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'customer-orders',
PARTITION BY = kafka_murmur2(customer_id::text)
)
KEY (customer_id, order_id)
FORMAT JSON
ENVELOPE UPSERT;
Debezium Sink
CREATE SINK inventory_changes
FROM inventory
INTO KAFKA CONNECTION kafka_conn (TOPIC 'inventory-cdc')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM;
Sink with Topic Configuration
CREATE SINK compacted_sink
IN CLUSTER io_cluster
FROM user_profiles
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'user-profiles',
TOPIC PARTITION COUNT 8,
TOPIC REPLICATION FACTOR 3,
TOPIC CONFIG MAP[
'cleanup.policy' => 'compact',
'segment.ms' => '3600000',
'min.compaction.lag.ms' => '60000'
]
)
KEY (user_id)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
Key Validation
For UPSERT sinks, Materialize validates key uniqueness:
-- This will succeed if user_id is unique
CREATE SINK valid_sink
FROM users
INTO KAFKA CONNECTION kafka_conn (TOPIC 'users')
KEY (user_id)
FORMAT JSON
ENVELOPE UPSERT;
-- If key is not provably unique, use NOT ENFORCED
CREATE SINK unvalidated_sink
FROM possibly_duplicate_keys
INTO KAFKA CONNECTION kafka_conn (TOPIC 'data')
KEY (id) NOT ENFORCED
FORMAT JSON
ENVELOPE UPSERT;
If you use NOT ENFORCED and the key is not actually unique, downstream consumers may see incorrect results and Kafka compaction may delete important records.
Best Practices
-
Choose the Right Envelope:
- Use UPSERT for maintaining latest state per key
- Use DEBEZIUM when consumers need before/after states
-
Configure Topic Compaction:
TOPIC CONFIG MAP['cleanup.policy' => 'compact']
-
Use Appropriate Cluster Size:
- Sinks consume cluster resources
- Size clusters based on sink throughput needs
-
Monitor Sink Progress:
SELECT * FROM mz_sinks WHERE name = 'my_sink';
-
Validate Keys: Ensure keys are truly unique before using UPSERT
-
Consider Partitioning: Use custom partitioning for specific ordering requirements
Troubleshooting
Sink Errors
Check sink status:
SELECT * FROM mz_internal.mz_sink_statuses
WHERE name = 'my_sink';
Key Uniqueness Errors
If Materialize can’t validate key uniqueness:
- Verify the key is actually unique
- Create a materialized view with
DISTINCT ON
- Use
NOT ENFORCED as last resort
- Increase cluster size
- Reduce sink data volume
- Check Kafka broker performance
Related Pages