Ingest streaming data from Kafka and Redpanda topics into Materialize
Materialize can consume data from Kafka and Redpanda topics, supporting multiple message formats including Avro, Protobuf, JSON, CSV, and plain text. This guide covers everything you need to know about ingesting Kafka data into Materialize.
CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>';CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CERT>';CREATE CONNECTION kafka_connection TO KAFKA ( BROKER 'unique-jellyfish-0000.us-east-1.aws.confluent.cloud:9093', SSL KEY = SECRET kafka_ssl_key, SSL CERTIFICATE = SECRET kafka_ssl_crt);
-- First create the PrivateLink connectionCREATE CONNECTION privatelink_svc TO AWS PRIVATELINK ( SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc', AVAILABILITY ZONES ('use1-az1', 'use1-az4'));-- Then create the Kafka connection using PrivateLinkCREATE CONNECTION kafka_connection TO KAFKA ( BROKERS ( 'broker1:9092' USING AWS PRIVATELINK privatelink_svc, 'broker2:9092' USING AWS PRIVATELINK privatelink_svc ));
-- Create SSH tunnel connectionCREATE CONNECTION ssh_connection TO SSH TUNNEL ( HOST 'bastion-host.example.com', PORT 22, USER 'materialize');-- Create Kafka connection through tunnelCREATE CONNECTION kafka_connection TO KAFKA ( BROKERS ( 'broker1:9092' USING SSH TUNNEL ssh_connection, 'broker2:9092' USING SSH TUNNEL ssh_connection ));
CREATE SOURCE kafka_json FROM KAFKA CONNECTION kafka_connection (TOPIC 'events') FORMAT JSON;
The source will have a single data column of type jsonb. Create a parsing view to extract fields:
CREATE VIEW events_parsed ASSELECT (data->>'id')::bigint AS id, (data->>'user_id')::bigint AS user_id, (data->>'event_type')::text AS event_type, (data->>'timestamp')::timestamp AS timestampFROM kafka_json;
JSON Schema Registry integration is not yet supported. Messages must be plain JSON, not the JSON_SR serialization format.
CREATE SOURCE kafka_with_key FROM KAFKA CONNECTION kafka_connection (TOPIC 'data') KEY FORMAT TEXT VALUE FORMAT JSON INCLUDE KEY AS message_key;
Headers:Include all headers:
CREATE SOURCE kafka_with_headers FROM KAFKA CONNECTION kafka_connection (TOPIC 'data') FORMAT JSON INCLUDE HEADERS;
Include specific headers:
CREATE SOURCE kafka_with_headers FROM KAFKA CONNECTION kafka_connection (TOPIC 'data') FORMAT JSON INCLUDE HEADER 'client_id' AS client, INCLUDE HEADER 'trace_id' AS trace BYTES;
Query headers as a map:
SELECT data->>'order_id' AS order_id, convert_from(map_build(headers)->'client_id', 'utf-8') AS client_idFROM kafka_with_headers;
Partition, offset, timestamp:
CREATE SOURCE kafka_with_metadata FROM KAFKA CONNECTION kafka_connection (TOPIC 'data') FORMAT JSON INCLUDE PARTITION, OFFSET, TIMESTAMP AS ts;
CREATE SOURCE kafka_offset FROM KAFKA CONNECTION kafka_connection ( TOPIC 'data', START OFFSET (0, 100, 200) -- partition 0 at offset 0, partition 1 at 100, etc. ) FORMAT JSON;
Time-based:
CREATE SOURCE kafka_timestamp FROM KAFKA CONNECTION kafka_connection ( TOPIC 'data', START TIMESTAMP 1640995200000 -- Unix timestamp in milliseconds ) FORMAT JSON;
START TIMESTAMP is evaluated once at source creation time and remains fixed. To filter data after source creation, use temporal filters in views instead.
Materialize commits offsets to Kafka for monitoring compatibility:
-- Find the consumer group ID prefixSELECT group_id_prefixFROM mz_internal.mz_kafka_sources ksJOIN mz_sources s ON s.id = ks.idWHERE s.name = 'kafka_source';
Then use Kafka monitoring tools (kafka-consumer-groups, etc.) to check lag.
Materialize’s consumer groups may show “no active members” in monitoring tools. This is expected and not a cause for concern.
This usually means messages have been deleted due to retention:
-- Drop and recreate the source to start from current offsetsDROP SOURCE kafka_source CASCADE;CREATE SOURCE kafka_source FROM KAFKA CONNECTION kafka_connection (TOPIC 'data') FORMAT JSON;