Skip to main content

Overview

CREATE SINK connects Materialize to external systems for streaming data out. Sinks continuously export updates from sources, tables, materialized views, or views to external destinations.

Syntax

CREATE SINK [IF NOT EXISTS] <sink_name>
  [IN CLUSTER <cluster_name>]
  FROM <source | table | materialized_view>
  INTO KAFKA CONNECTION <connection_name> (
    TOPIC '<topic_name>',
    <kafka_options>
  )
  [KEY (<column> [, ...])]
  FORMAT { AVRO | JSON | TEXT | BYTES }
  [ENVELOPE { UPSERT | DEBEZIUM }];

Supported Destinations

Kafka/Redpanda Sinks

Stream data to Kafka or Redpanda topics:
CREATE CONNECTION kafka_conn TO KAFKA (
  BROKER 'kafka.example.com:9092',
  SASL MECHANISMS = 'PLAIN',
  SASL USERNAME = 'user',
  SASL PASSWORD = SECRET kafka_password
);

CREATE SINK orders_sink
  FROM orders_view
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'orders')
  KEY (order_id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

Parameters

sink_name
identifier
required
Name of the sink to create
source
identifier
required
Name of the source, table, materialized view, or view to export
connection_name
identifier
required
Name of the Kafka connection to use
topic_name
string
required
Name of the Kafka topic to write to
cluster_name
identifier
Cluster where the sink will run. Defaults to the active cluster.

Formats

Avro Format

Export data in Avro format with schema registry integration:
CREATE SINK avro_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
  KEY (id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;
Features:
  • Automatic schema registration
  • Column name sanitization
  • Type conversion to Avro types
  • Schema evolution support
Type Mapping:
  • integer → Avro int
  • bigint → Avro long
  • text → Avro string
  • boolean → Avro boolean
  • double precision → Avro double
  • timestamp → Avro long with timestamp-micros
  • jsonb → Avro string with io.debezium.data.Json

JSON Format

Export data as JSON objects:
CREATE SINK json_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
  KEY (id)
  FORMAT JSON
  ENVELOPE UPSERT;
Output Example:
{"id": 123, "name": "Alice", "email": "[email protected]"}

Text/Bytes Format

Export single-column data:
CREATE SINK text_sink
  FROM single_column_view
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
  FORMAT TEXT
  ENVELOPE UPSERT;

Envelopes

Upsert Envelope

Standard key-value semantics for inserts, updates, and deletes:
CREATE SINK upsert_sink
  FROM users
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'users')
  KEY (user_id)
  FORMAT JSON
  ENVELOPE UPSERT;
Behavior:
  • Insert: New key → Emit full row
  • Update: Existing key → Emit new row (old row not included)
  • Delete: Existing key → Emit tombstone (null value)
Requirements:
  • Must specify KEY with unique columns
  • Key must be unique in the source

Debezium Envelope

Debezium-style change events with before/after states:
CREATE SINK debezium_sink
  FROM orders
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'orders')
  FORMAT JSON
  ENVELOPE DEBEZIUM;
Output Format:
// Insert
{"before": null, "after": {"id": 1, "amount": 100}}

// Update  
{"before": {"id": 1, "amount": 100}, "after": {"id": 1, "amount": 150}}

// Delete
{"before": {"id": 1, "amount": 150}, "after": null}

Topic Configuration

Configure Kafka topic settings:
CREATE SINK configured_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_conn (
    TOPIC 'my_topic',
    TOPIC PARTITION COUNT 4,
    TOPIC REPLICATION FACTOR 3,
    TOPIC CONFIG MAP[
      'cleanup.policy' => 'compact',
      'retention.ms' => '86400000'
    ]
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;
TOPIC PARTITION COUNT
integer
Number of partitions for the topic
TOPIC REPLICATION FACTOR
integer
Replication factor for the topic
TOPIC CONFIG
map
Additional Kafka topic configuration

Custom Partitioning

Control how messages are partitioned:
CREATE SINK partitioned_sink
  FROM orders
  INTO KAFKA CONNECTION kafka_conn (
    TOPIC 'orders',
    PARTITION BY = seahash(customer_id::text)
  )
  KEY (order_id)
  FORMAT JSON
  ENVELOPE UPSERT;
The PARTITION BY expression:
  • Must return a uint8 value
  • Determines the partition: partition = hash % partition_count
  • Can reference columns from the source
Available Hash Functions:
  • seahash() - Default hash function
  • kafka_murmur2() - Compatible with Kafka’s default partitioner
  • crc32() - CRC32 hash

Exactly-Once Semantics

Materialize guarantees exactly-once delivery:
-- Automatically uses progress topic for exactly-once
CREATE SINK exactly_once_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'my_topic')
  KEY (id)
  FORMAT JSON
  ENVELOPE UPSERT;
Implementation:
  • Uses Kafka transactions
  • Maintains progress in a dedicated topic
  • Recovers from failures without duplicates
Progress Topic:
  • Named: _materialize-progress-{REGION_ID}-{CONNECTION_ID}
  • Stores sink checkpoint information
  • Required for exactly-once guarantees

Examples

Basic Kafka Sink

CREATE SINK orders_kafka
  FROM orders_materialized
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'orders')
  KEY (order_id)
  FORMAT JSON
  ENVELOPE UPSERT;

Avro Sink with Schema Registry

CREATE SINK users_avro
  FROM users_view
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'users')
  KEY (user_id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (
    KEY COMPATIBILITY LEVEL 'BACKWARD',
    VALUE COMPATIBILITY LEVEL 'BACKWARD_TRANSITIVE'
  )
  ENVELOPE UPSERT;

Sink with Custom Partitioning

CREATE SINK customer_orders
  FROM order_details
  INTO KAFKA CONNECTION kafka_conn (
    TOPIC 'customer-orders',
    PARTITION BY = kafka_murmur2(customer_id::text)
  )
  KEY (customer_id, order_id)
  FORMAT JSON
  ENVELOPE UPSERT;

Debezium Sink

CREATE SINK inventory_changes
  FROM inventory
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'inventory-cdc')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE DEBEZIUM;

Sink with Topic Configuration

CREATE SINK compacted_sink
  IN CLUSTER io_cluster
  FROM user_profiles
  INTO KAFKA CONNECTION kafka_conn (
    TOPIC 'user-profiles',
    TOPIC PARTITION COUNT 8,
    TOPIC REPLICATION FACTOR 3,
    TOPIC CONFIG MAP[
      'cleanup.policy' => 'compact',
      'segment.ms' => '3600000',
      'min.compaction.lag.ms' => '60000'
    ]
  )
  KEY (user_id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

Key Validation

For UPSERT sinks, Materialize validates key uniqueness:
-- This will succeed if user_id is unique
CREATE SINK valid_sink
  FROM users
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'users')
  KEY (user_id)
  FORMAT JSON
  ENVELOPE UPSERT;

-- If key is not provably unique, use NOT ENFORCED
CREATE SINK unvalidated_sink
  FROM possibly_duplicate_keys
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'data')
  KEY (id) NOT ENFORCED
  FORMAT JSON
  ENVELOPE UPSERT;
If you use NOT ENFORCED and the key is not actually unique, downstream consumers may see incorrect results and Kafka compaction may delete important records.

Best Practices

  1. Choose the Right Envelope:
    • Use UPSERT for maintaining latest state per key
    • Use DEBEZIUM when consumers need before/after states
  2. Configure Topic Compaction:
    TOPIC CONFIG MAP['cleanup.policy' => 'compact']
    
  3. Use Appropriate Cluster Size:
    • Sinks consume cluster resources
    • Size clusters based on sink throughput needs
  4. Monitor Sink Progress:
    SELECT * FROM mz_sinks WHERE name = 'my_sink';
    
  5. Validate Keys: Ensure keys are truly unique before using UPSERT
  6. Consider Partitioning: Use custom partitioning for specific ordering requirements

Troubleshooting

Sink Errors

Check sink status:
SELECT * FROM mz_internal.mz_sink_statuses 
WHERE name = 'my_sink';

Key Uniqueness Errors

If Materialize can’t validate key uniqueness:
  1. Verify the key is actually unique
  2. Create a materialized view with DISTINCT ON
  3. Use NOT ENFORCED as last resort

Performance Issues

  • Increase cluster size
  • Reduce sink data volume
  • Check Kafka broker performance

Build docs developers (and LLMs) love