Overview
A Kafka sink:- Streams changes from materialized views, sources, or tables to Kafka topics
- Provides exactly-once delivery guarantees by default
- Supports multiple formats (Avro, JSON, TEXT, BYTES)
- Offers envelope options for different change semantics (Upsert, Debezium)
- Automatically creates topics if they don’t exist
Prerequisites
Before creating a sink, you need a Kafka connection:SSL Connection
Creating Sinks
Basic Sink with JSON Format
Create a sink that writes to a Kafka topic using JSON:Sink with Avro Format
Use Avro format with Confluent Schema Registry:Sink Formats
JSON Format
JSON format creates human-readable messages:Avro Format
Avro format provides schema evolution and compact encoding:Separate Key and Value Formats
Use different formats for keys and values:Envelope Types
Upsert Envelope
The upsert envelope sends the current row value and uses tombstones for deletes:- Insert/Update: Key + full row value
- Delete: Key + null value (tombstone)
Debezium Envelope
The Debezium envelope includes before and after values:Advanced Configuration
Topic Configuration
Customize topic settings:Custom Partitioning
Control message partitioning:kafka_murmur2()- Kafka’s default partitionercrc32()- CRC32 hashseahash()- SeaHash algorithm
Schema Compatibility
Set Avro schema compatibility levels:Dedicated Cluster
Run sinks on dedicated clusters for isolation:Exactly-Once Processing
Kafka sinks provide exactly-once guarantees by default using a progress topic.Progress Topic
Materialize stores internal metadata in a progress topic:_materialize-progress-{REGION_ID}-{CONNECTION_ID}
End-to-End Exactly-Once
For full exactly-once semantics: Broker configuration:- Make processing idempotent
- Only commit offsets after successful processing
Real-World Examples
E-commerce Order Pipeline
Stream order updates to Kafka:Real-Time Analytics Aggregations
Stream aggregated metrics:Change Data Capture
Capture all changes with full history:Monitoring Sinks
Check Sink Status
Monitor Progress
Required Kafka ACLs
Ensure your Kafka user has these permissions:| Operation | Resource Type | Resource Name |
|---|---|---|
| Read, Write | Topic | Progress topic |
| Write | Topic | Data topic |
| Write | Transactional ID | {TRANSACTIONAL_ID_PREFIX}* |
| Read | Group | {PROGRESS_GROUP_ID_PREFIX}* |
| Operation | Resource Type |
|---|---|
| DescribeConfigs | Cluster |
| Create | Topic |
Troubleshooting
Unique Key Validation
If you see “upsert key could not be validated as unique”: Option 1: Use a known unique keyConnection Issues
Memory Usage
During creation, sinks load an entire snapshot into memory. For large datasets:Best Practices
Cluster Isolation
Separate sink clusters from source and compute clusters:Topic Naming
Use consistent naming conventions:Compaction for Upsert
Enable compaction for upsert sinks:Schema Evolution
Use Avro with Schema Registry for schema evolution:Redpanda
Redpanda uses identical syntax to Kafka:Next Steps
CREATE SINK Reference
Complete CREATE SINK syntax reference
Query Results
Query data with SELECT statements
Subscribe
Stream changes with SUBSCRIBE
CREATE CONNECTION
Create Kafka connections