Skip to main content
Kafka sinks allow you to write data from Materialize to Kafka or Redpanda topics. Materialize automatically streams changes from materialized views, sources, or tables as change data capture (CDC) events.

Overview

A Kafka sink:
  • Streams changes from materialized views, sources, or tables to Kafka topics
  • Provides exactly-once delivery guarantees by default
  • Supports multiple formats (Avro, JSON, TEXT, BYTES)
  • Offers envelope options for different change semantics (Upsert, Debezium)
  • Automatically creates topics if they don’t exist

Prerequisites

Before creating a sink, you need a Kafka connection:
-- Create secrets for authentication
CREATE SECRET kafka_password AS 'your-password';

-- Create Kafka connection
CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'broker.kafka.example.com:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'your-username',
    SASL PASSWORD = SECRET kafka_password
);

SSL Connection

CREATE SECRET kafka_ssl_key AS 'your-ssl-key';
CREATE SECRET kafka_ssl_crt AS 'your-ssl-certificate';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9093',
    SSL KEY = SECRET kafka_ssl_key,
    SSL CERTIFICATE = SECRET kafka_ssl_crt
);

Creating Sinks

Basic Sink with JSON Format

Create a sink that writes to a Kafka topic using JSON:
CREATE SINK json_sink
  FROM my_materialized_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'sales_data')
  FORMAT JSON
  ENVELOPE UPSERT
  KEY (id);

Sink with Avro Format

Use Avro format with Confluent Schema Registry:
-- Create Schema Registry connection
CREATE SECRET csr_password AS 'your-csr-password';

CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
    URL 'https://schema-registry.example.com',
    USERNAME = 'your-username',
    PASSWORD = SECRET csr_password
);

-- Create sink with Avro
CREATE SINK avro_sink
  FROM my_materialized_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'sales_data_avro')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT
  KEY (id);

Sink Formats

JSON Format

JSON format creates human-readable messages:
CREATE SINK json_sink
  FROM orders
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'orders_json')
  KEY (order_id)
  FORMAT JSON
  ENVELOPE UPSERT;
Example output:
// Key
{"order_id": 123}

// Value
{"order_id": 123, "customer_id": 456, "total": 99.50, "status": "completed"}

Avro Format

Avro format provides schema evolution and compact encoding:
CREATE SINK avro_sink
  FROM orders
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'orders_avro')
  KEY (order_id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;
Avro schemas are automatically registered in the Schema Registry.

Separate Key and Value Formats

Use different formats for keys and values:
CREATE SINK mixed_format_sink
  FROM orders
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'orders_mixed')
  KEY (order_id)
  KEY FORMAT JSON
  VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

Envelope Types

Upsert Envelope

The upsert envelope sends the current row value and uses tombstones for deletes:
CREATE SINK upsert_sink
  FROM product_inventory
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'inventory')
  KEY (product_id)
  FORMAT JSON
  ENVELOPE UPSERT;
Message patterns:
  • Insert/Update: Key + full row value
  • Delete: Key + null value (tombstone)
Example messages:
// Insert
Key: {"product_id": 101}
Value: {"product_id": 101, "quantity": 50, "price": 29.99}

// Update (quantity changed)
Key: {"product_id": 101}
Value: {"product_id": 101, "quantity": 45, "price": 29.99}

// Delete
Key: {"product_id": 101}
Value: null

Debezium Envelope

The Debezium envelope includes before and after values:
CREATE SINK debezium_sink
  FROM product_inventory
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'inventory_cdc')
  FORMAT JSON
  ENVELOPE DEBEZIUM;
Message patterns:
// Insert
{"before": null, "after": {"product_id": 101, "quantity": 50}}

// Update
{"before": {"product_id": 101, "quantity": 50}, "after": {"product_id": 101, "quantity": 45}}

// Delete
{"before": {"product_id": 101, "quantity": 45}, "after": null}

Advanced Configuration

Topic Configuration

Customize topic settings:
CREATE SINK configured_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'my_topic',
    TOPIC PARTITION COUNT 6,
    TOPIC REPLICATION FACTOR 3,
    TOPIC CONFIG MAP[
      'cleanup.policy' => 'compact',
      'retention.ms' => '86400000',
      'compression.type' => 'snappy'
    ]
  )
  KEY (id)
  FORMAT JSON
  ENVELOPE UPSERT;

Custom Partitioning

Control message partitioning:
CREATE SINK partitioned_sink
  FROM orders
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'orders',
    PARTITION BY = kafka_murmur2(customer_id::text)
  )
  KEY (order_id)
  FORMAT JSON
  ENVELOPE UPSERT;
Hash functions:
  • kafka_murmur2() - Kafka’s default partitioner
  • crc32() - CRC32 hash
  • seahash() - SeaHash algorithm

Schema Compatibility

Set Avro schema compatibility levels:
CREATE SINK compatibility_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'my_topic')
  KEY (id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection (
    KEY COMPATIBILITY LEVEL 'BACKWARD',
    VALUE COMPATIBILITY LEVEL 'BACKWARD_TRANSITIVE'
  )
  ENVELOPE UPSERT;

Dedicated Cluster

Run sinks on dedicated clusters for isolation:
CREATE CLUSTER sink_cluster SIZE = 'medium';

CREATE SINK isolated_sink
  IN CLUSTER sink_cluster
  FROM my_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'my_topic')
  KEY (id)
  FORMAT JSON
  ENVELOPE UPSERT;

Exactly-Once Processing

Kafka sinks provide exactly-once guarantees by default using a progress topic.

Progress Topic

Materialize stores internal metadata in a progress topic:
CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'broker.kafka.example.com:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'user',
    SASL PASSWORD = SECRET kafka_password,
    -- Custom progress topic name
    PROGRESS TOPIC 'my_progress_topic'
);
Default progress topic name: _materialize-progress-{REGION_ID}-{CONNECTION_ID}

End-to-End Exactly-Once

For full exactly-once semantics: Broker configuration:
replication.factor=3
unclean.leader.election.enable=false
Consumer configuration:
isolation.level=read_committed
enable.auto.commit=false
Application logic:
  • Make processing idempotent
  • Only commit offsets after successful processing

Real-World Examples

E-commerce Order Pipeline

Stream order updates to Kafka:
-- Materialized view of order summaries
CREATE MATERIALIZED VIEW order_summaries AS
SELECT 
    o.order_id,
    o.customer_id,
    o.order_date,
    SUM(oi.quantity * oi.price) as total_amount,
    COUNT(*) as item_count,
    o.status
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
GROUP BY o.order_id, o.customer_id, o.order_date, o.status;

-- Sink to Kafka
CREATE SINK order_updates
  FROM order_summaries
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'order_updates')
  KEY (order_id)
  FORMAT JSON
  ENVELOPE UPSERT;

Real-Time Analytics Aggregations

Stream aggregated metrics:
-- Materialized view with windowed aggregations
CREATE MATERIALIZED VIEW hourly_sales AS
SELECT 
    date_trunc('hour', order_timestamp) as hour,
    region,
    COUNT(*) as order_count,
    SUM(total) as revenue,
    AVG(total) as avg_order_value
FROM orders
GROUP BY hour, region;

-- Sink with custom partitioning by region
CREATE SINK sales_metrics
  FROM hourly_sales
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'sales_metrics',
    PARTITION BY = kafka_murmur2(region)
  )
  KEY (hour, region)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

Change Data Capture

Capture all changes with full history:
CREATE SINK customer_cdc
  FROM customers
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'customer_changes')
  KEY (customer_id)
  FORMAT JSON
  ENVELOPE DEBEZIUM;

Monitoring Sinks

Check Sink Status

SELECT 
    name,
    type,
    size,
    cluster_id,
    replication_factor
FROM mz_sinks
WHERE name = 'my_sink';

Monitor Progress

SELECT 
    s.name,
    se.error,
    se.details
FROM mz_sinks s
JOIN mz_internal.mz_sink_status_history se ON s.id = se.sink_id
WHERE se.occurred_at > NOW() - INTERVAL '1 hour'
ORDER BY se.occurred_at DESC;

Required Kafka ACLs

Ensure your Kafka user has these permissions:
OperationResource TypeResource Name
Read, WriteTopicProgress topic
WriteTopicData topic
WriteTransactional ID{TRANSACTIONAL_ID_PREFIX}*
ReadGroup{PROGRESS_GROUP_ID_PREFIX}*
For automatic topic creation, also grant:
OperationResource Type
DescribeConfigsCluster
CreateTopic

Troubleshooting

Unique Key Validation

If you see “upsert key could not be validated as unique”: Option 1: Use a known unique key
-- Use primary key or unique constraint
CREATE SINK my_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'topic')
  KEY (primary_key_col)  -- Use actual unique key
  FORMAT JSON
  ENVELOPE UPSERT;
Option 2: Create a deduplicated view
CREATE MATERIALIZED VIEW deduped AS
SELECT DISTINCT ON (key_col) *
FROM original_view
ORDER BY key_col, updated_at DESC;

CREATE SINK deduped_sink
  FROM deduped
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'topic')
  KEY (key_col)
  FORMAT JSON
  ENVELOPE UPSERT;
Option 3: Disable validation (use with caution)
CREATE SINK my_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'topic')
  KEY (key_col) NOT ENFORCED  -- Bypass uniqueness check
  FORMAT JSON
  ENVELOPE UPSERT;

Connection Issues

-- Test connection
SELECT * FROM mz_kafka_connections;

-- Check for errors
SELECT * FROM mz_internal.mz_sink_status_history
WHERE error IS NOT NULL
ORDER BY occurred_at DESC
LIMIT 10;

Memory Usage

During creation, sinks load an entire snapshot into memory. For large datasets:
-- Use a larger cluster
CREATE CLUSTER large_sink_cluster SIZE = 'xlarge';

CREATE SINK large_dataset_sink
  IN CLUSTER large_sink_cluster
  FROM large_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'topic')
  KEY (id)
  FORMAT JSON
  ENVELOPE UPSERT;

Best Practices

Cluster Isolation

Separate sink clusters from source and compute clusters:
CREATE CLUSTER source_cluster SIZE = 'medium';
CREATE CLUSTER compute_cluster SIZE = 'large';
CREATE CLUSTER sink_cluster SIZE = 'medium';

-- Sources on source_cluster
-- Views on compute_cluster  
-- Sinks on sink_cluster

Topic Naming

Use consistent naming conventions:
CREATE SINK production_orders_sink
  FROM orders
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'prod.materialize.orders.v1'
  )
  KEY (order_id)
  FORMAT JSON
  ENVELOPE UPSERT;

Compaction for Upsert

Enable compaction for upsert sinks:
CREATE SINK compacted_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'my_topic',
    TOPIC CONFIG MAP['cleanup.policy' => 'compact']
  )
  KEY (id)
  FORMAT JSON
  ENVELOPE UPSERT;

Schema Evolution

Use Avro with Schema Registry for schema evolution:
CREATE SINK evolving_schema_sink
  FROM my_view
  INTO KAFKA CONNECTION kafka_connection (TOPIC 'topic')
  KEY (id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection (
    VALUE COMPATIBILITY LEVEL 'BACKWARD'
  )
  ENVELOPE UPSERT;

Redpanda

Redpanda uses identical syntax to Kafka:
CREATE CONNECTION redpanda_connection TO KAFKA (
    BROKER 'redpanda.example.com:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'user',
    SASL PASSWORD = SECRET redpanda_password
);

CREATE SINK redpanda_sink
  FROM my_view
  INTO KAFKA CONNECTION redpanda_connection (TOPIC 'my_topic')
  KEY (id)
  FORMAT JSON
  ENVELOPE UPSERT;

Next Steps

CREATE SINK Reference

Complete CREATE SINK syntax reference

Query Results

Query data with SELECT statements

Subscribe

Stream changes with SUBSCRIBE

CREATE CONNECTION

Create Kafka connections

Build docs developers (and LLMs) love