Skip to main content
Materialize can consume data from Kafka and Redpanda topics, supporting multiple message formats including Avro, Protobuf, JSON, CSV, and plain text. This guide covers everything you need to know about ingesting Kafka data into Materialize.

Prerequisites

Before creating a Kafka source, you need:
  1. Kafka cluster access (Kafka 0.10+ or Redpanda)
  2. Connection details (bootstrap servers, authentication credentials)
  3. Topic permissions (READ access to topics and consumer groups)
  4. (Optional) Schema Registry access for Avro/Protobuf formats
The same syntax and features work for both Kafka and Redpanda. Simply use your Redpanda broker addresses instead of Kafka brokers.

Step 1: Create a Connection

Connections encapsulate authentication details and are reusable across multiple sources.
CREATE SECRET kafka_password AS '<BROKER_PASSWORD>';

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'pkc-1234.us-east-1.aws.confluent.cloud:9092',
    SASL MECHANISMS = 'SCRAM-SHA-256',
    SASL USERNAME = 'your-username',
    SASL PASSWORD = SECRET kafka_password
);

Schema Registry Connection

For Avro or Protobuf formats with schema registry:
CREATE SECRET csr_username AS '<CSR_USERNAME>';
CREATE SECRET csr_password AS '<CSR_PASSWORD>';

CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
    URL 'https://schema-registry.example.com:8081',
    USERNAME = SECRET csr_username,
    PASSWORD = SECRET csr_password
);

Step 2: Create a Source

Kafka sources support multiple data formats and envelopes.

Format: Avro

Avro messages with Confluent Schema Registry:
CREATE SOURCE kafka_avro
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'orders')
    FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE NONE;
Key features:
  • Schema is automatically retrieved from the registry
  • The latest schema version is used at source creation time
  • Schema evolution is supported as long as changes are compatible
  • All Avro types are supported except recursive types and union types in arrays

Format: JSON

JSON messages (stored as jsonb):
CREATE SOURCE kafka_json
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'events')
    FORMAT JSON;
The source will have a single data column of type jsonb. Create a parsing view to extract fields:
CREATE VIEW events_parsed AS
SELECT
    (data->>'id')::bigint AS id,
    (data->>'user_id')::bigint AS user_id,
    (data->>'event_type')::text AS event_type,
    (data->>'timestamp')::timestamp AS timestamp
FROM kafka_json;
JSON Schema Registry integration is not yet supported. Messages must be plain JSON, not the JSON_SR serialization format.

Format: Protobuf

Using Schema Registry:
CREATE SOURCE kafka_proto
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'metrics')
    FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection;
Using Inline Schema:
-- First, compile your .proto file to a descriptor
-- protoc --include_imports --descriptor_set_out=schema.pb schema.proto
-- Then encode it: printf '\\x' && xxd -p schema.pb | tr -d '\n'

CREATE SOURCE kafka_proto
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'metrics')
    FORMAT PROTOBUF MESSAGE 'billing.Batch' USING SCHEMA '\x0a300a0d62696...';
The message name must be fully qualified with the package name: package.MessageName

Format: CSV

CSV-formatted messages:
CREATE SOURCE kafka_csv (col1, col2, col3)
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
    FORMAT CSV WITH 3 COLUMNS;

Format: Text/Bytes

Plain text:
CREATE SOURCE kafka_text
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'logs')
    FORMAT TEXT;
Raw bytes:
CREATE SOURCE kafka_bytes
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'binary_data')
    FORMAT BYTES;

Key and Value Formats

You can specify different formats for keys and values:
CREATE SOURCE kafka_key_value
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'orders')
    KEY FORMAT TEXT
    VALUE FORMAT JSON
    INCLUDE KEY AS order_id;

Envelopes

Append-Only (ENVELOPE NONE)

All messages are treated as inserts. This is the default envelope.
CREATE SOURCE kafka_append
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'clicks')
    FORMAT JSON
    ENVELOPE NONE;  -- or omit ENVELOPE clause

Upsert (ENVELOPE UPSERT)

Supports inserts, updates, and deletes using key-value convention:
CREATE SOURCE kafka_upsert
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'users')
    FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE UPSERT;
How it works:
  • Non-null key + non-null value: Insert or update
  • Non-null key + null value: Delete
  • Null key: Error (source enters error state)
Upsert sources can lead to high memory usage. Use standard-sized clusters that automatically spill to disk, not legacy-sized clusters.
Handling value decoding errors:
CREATE SOURCE kafka_upsert
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'users')
    KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE UPSERT (VALUE DECODING ERRORS = INLINE);
This adds an error column that contains decoding error messages, allowing the source to continue ingesting.

Debezium (ENVELOPE DEBEZIUM)

For Debezium-formatted change events:
CREATE SOURCE kafka_debezium
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'postgres.public.orders')
    FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
    ENVELOPE DEBEZIUM;
Supported operations:
  • Insert: before is null
  • Update: Both before and after are non-null
  • Delete: after is null
Debezium envelope can lead to high memory usage. Use standard-sized clusters for automatic spilling to disk.

Advanced Features

Exposing Metadata

Message key:
CREATE SOURCE kafka_with_key
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
    KEY FORMAT TEXT
    VALUE FORMAT JSON
    INCLUDE KEY AS message_key;
Headers: Include all headers:
CREATE SOURCE kafka_with_headers
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
    FORMAT JSON
    INCLUDE HEADERS;
Include specific headers:
CREATE SOURCE kafka_with_headers
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
    FORMAT JSON
    INCLUDE HEADER 'client_id' AS client,
    INCLUDE HEADER 'trace_id' AS trace BYTES;
Query headers as a map:
SELECT
    data->>'order_id' AS order_id,
    convert_from(map_build(headers)->'client_id', 'utf-8') AS client_id
FROM kafka_with_headers;
Partition, offset, timestamp:
CREATE SOURCE kafka_with_metadata
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
    FORMAT JSON
    INCLUDE PARTITION, OFFSET, TIMESTAMP AS ts;

Setting Start Offsets

Offset-based:
CREATE SOURCE kafka_offset
    FROM KAFKA CONNECTION kafka_connection (
        TOPIC 'data',
        START OFFSET (0, 100, 200)  -- partition 0 at offset 0, partition 1 at 100, etc.
    )
    FORMAT JSON;
Time-based:
CREATE SOURCE kafka_timestamp
    FROM KAFKA CONNECTION kafka_connection (
        TOPIC 'data',
        START TIMESTAMP 1640995200000  -- Unix timestamp in milliseconds
    )
    FORMAT JSON;
START TIMESTAMP is evaluated once at source creation time and remains fixed. To filter data after source creation, use temporal filters in views instead.

Multiple Topics and Partitions

Kafka automatically handles:
  • Multiple partitions: Data from all partitions is consumed in parallel
  • Partition rebalancing: Automatically managed by Materialize
  • Consumer groups: Materialize manages consumer group membership

Monitoring

Check Source Status

SELECT *
FROM mz_internal.mz_source_statuses
WHERE name = 'kafka_source';

Monitor Ingestion Progress

SELECT
    partition,
    "offset"
FROM (
    SELECT
        upper(partition)::uint8 AS partition,
        "offset"
    FROM kafka_source_progress
)
WHERE partition IS NOT NULL;

Check Consumer Lag

Materialize commits offsets to Kafka for monitoring compatibility:
-- Find the consumer group ID prefix
SELECT group_id_prefix
FROM mz_internal.mz_kafka_sources ks
JOIN mz_sources s ON s.id = ks.id
WHERE s.name = 'kafka_source';
Then use Kafka monitoring tools (kafka-consumer-groups, etc.) to check lag.
Materialize’s consumer groups may show “no active members” in monitoring tools. This is expected and not a cause for concern.

Troubleshooting

Source Not Progressing

Check Kafka permissions:
-- Required ACLs:
-- Read on Topic: your_topic
-- Read on Group: your_group_id_prefix*
Verify connectivity:
VALIDATE CONNECTION kafka_connection;

Authentication Errors

  • Verify SASL credentials are correct
  • Check that SSL certificates are valid and not expired
  • Ensure the user has necessary Kafka ACLs

Schema Registry Issues

Connection problems:
VALIDATE CONNECTION csr_connection;
Schema not found:
  • Verify the topic name matches the schema subject
  • Check that schemas exist in the registry
  • Ensure credentials have read access to schemas

High Memory Usage

For upsert or Debezium sources:
  1. Use standard-sized clusters (automatic disk spilling)
  2. Increase cluster size if memory is exhausted
  3. Consider separating high-cardinality sources
-- Check cluster memory usage
SELECT
    cluster_name,
    replica_name,
    memory_percent
FROM mz_internal.mz_cluster_replica_metrics;

Consumer Group Errors

If you see consumer group errors:
  1. Check GROUP ID PREFIX uniqueness
  2. Verify ACLs allow reading from the consumer group
  3. Reset consumer group if corrupted (requires dropping and recreating source)

Offset Out of Range

This usually means messages have been deleted due to retention:
-- Drop and recreate the source to start from current offsets
DROP SOURCE kafka_source CASCADE;

CREATE SOURCE kafka_source
    FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
    FORMAT JSON;

Best Practices

Resource Management

Dedicated cluster for Kafka sources:
CREATE CLUSTER kafka_ingest SIZE = '100cc';

SET CLUSTER = kafka_ingest;

CREATE SOURCE kafka_source FROM ...
Right-size for workload:
  • Start with 100cc for most workloads
  • Use 200cc+ for high-throughput topics
  • Use 400cc+ for large upsert sources

Topic Configuration

For upsert sources consuming compacted topics, ensure adequate retention:
# In Kafka
kafka-configs --alter --topic users \
  --add-config retention.ms=604800000  # 7 days

Performance Optimization

  1. Use appropriate formats: Avro/Protobuf are more efficient than JSON
  2. Limit metadata columns: Only include metadata you need (keys, headers, etc.)
  3. Separate high-volume topics: Use different sources for different traffic patterns
  4. Monitor progress: Track consumer lag and adjust resources

Security

  • Store credentials in secrets, never in plain text
  • Use SSL/TLS for production deployments
  • Rotate credentials regularly
  • Use AWS PrivateLink or SSH tunnels for private networks

Required Kafka ACLs

Materialize requires the following Kafka ACLs:
OperationResource TypeResource Name
READTopicYour topic name(s)
READGroupAll groups matching your GROUP ID PREFIX
Example using Kafka CLI:
kafka-acls --add \
  --allow-principal User:materialize \
  --operation Read \
  --topic your_topic

kafka-acls --add \
  --allow-principal User:materialize \
  --operation Read \
  --group 'materialize-*'

Supported Kafka Distributions

Materialize works with:
  • Apache Kafka 0.10+
  • Confluent Platform
  • Confluent Cloud
  • Amazon MSK
  • Redpanda
  • WarpStream
  • Azure Event Hubs (Kafka-compatible mode)

Next Steps

Transform Data

Create materialized views on your Kafka data

Webhooks

Learn about webhook sources for HTTP ingestion

Build docs developers (and LLMs) love