Overview
CREATE SOURCE connects Materialize to external data sources and ingests streaming data. Sources provide the foundation for real-time transformations and materialized views.
Syntax
CREATE SOURCE [IF NOT EXISTS] <source_name>
FROM { POSTGRES | KAFKA | LOAD GENERATOR | WEBHOOK | ... }
CONNECTION <connection_name>
(<source_options>)
[FORMAT <format_spec>]
[ENVELOPE <envelope_type>]
[FOR ALL TABLES | FOR TABLES (...) | FOR SCHEMAS (...)];
Supported Source Types
Materialize supports multiple source types for ingesting data:
PostgreSQL Sources
Ingest change data capture (CDC) streams from PostgreSQL databases:
-- Create connection
CREATE SECRET pg_password AS 'password';
CREATE CONNECTION pg_connection TO POSTGRES (
HOST 'postgres.example.com',
PORT 5432,
USER 'materialize',
PASSWORD SECRET pg_password,
DATABASE 'mydb',
SSL MODE 'require'
);
-- Create source
CREATE SOURCE pg_source
FROM POSTGRES CONNECTION pg_connection
(PUBLICATION 'mz_source')
FOR ALL TABLES;
Key Features:
- Captures
INSERT, UPDATE, and DELETE operations
- Supports all PostgreSQL 11+ versions
- Automatically creates subsources for each table
- Handles schema evolution for compatible changes
Kafka Sources
Ingest streaming data from Apache Kafka or Redpanda:
-- Create connection
CREATE SECRET kafka_password AS 'password';
CREATE CONNECTION kafka_connection TO KAFKA (
BROKER 'kafka.example.com:9092',
SASL MECHANISMS = 'SCRAM-SHA-256',
SASL USERNAME = 'user',
SASL PASSWORD = SECRET kafka_password
);
-- Create source with Avro format
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL 'https://schema-registry.example.com'
);
CREATE SOURCE kafka_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'orders')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE NONE;
Supported Formats:
- Avro - with Confluent Schema Registry integration
- JSON - stored as
jsonb type
- Protobuf - with schema registry or inline schemas
- Text/Bytes - plain text or raw binary data
- CSV - comma-separated values
Envelopes:
NONE (append-only) - Default, treats all records as inserts
UPSERT - Supports inserts, updates, and deletes based on key
DEBEZIUM - Decodes Debezium CDC format
Load Generator Sources
Generate test data for development and testing:
CREATE SOURCE auction_data
FROM LOAD GENERATOR AUCTION
FOR ALL TABLES;
CREATE SOURCE counter_data
FROM LOAD GENERATOR COUNTER;
Webhook Sources
Receive data via HTTP webhooks:
CREATE SOURCE webhook_source
FROM WEBHOOK
BODY FORMAT JSON;
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
- Automatically determines columns and types from schema
- Supports schema evolution
- Integrates with Confluent Schema Registry
- Creates a single
jsonb column named data
- Recommended to create a parsing view for typed access:
CREATE VIEW parsed_json AS
SELECT
(data->>'id')::int AS id,
(data->>'name')::text AS name,
(data->>'created_at')::timestamp AS created_at
FROM json_source;
-- Using schema registry
FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
-- Using inline schema
FORMAT PROTOBUF MESSAGE 'billing.Batch'
USING SCHEMA '\x0a300a0d62696...'
Text Format
- Creates a single
text column
- Parses new-line delimited data as UTF-8
- Creates a single
bytea column
- Stores raw binary data without decoding
Envelope Types
Append-Only (ENVELOPE NONE)
Default envelope. Treats all records as inserts:
CREATE SOURCE kafka_append
FROM KAFKA CONNECTION kafka_connection (TOPIC 'events')
FORMAT JSON
ENVELOPE NONE; -- Optional, this is the default
Upsert Envelope
Supports inserts, updates, and deletes using key-value semantics:
CREATE SOURCE kafka_upsert
FROM KAFKA CONNECTION kafka_connection (TOPIC 'users')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
- Records with new keys are inserted
- Records with existing keys and non-null values are updated
- Records with existing keys and null values are deleted
- Required for log-compacted topics
Debezium Envelope
Decodes Debezium change events:
CREATE SOURCE debezium_source
FROM KAFKA CONNECTION kafka_connection (TOPIC 'dbserver.public.users')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM;
- Interprets
before and after fields
- Supports
INSERT, UPDATE, and DELETE operations
- Incrementally maintains views based on CDC events
Include message metadata in the source:
CREATE SOURCE kafka_metadata
FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
KEY FORMAT TEXT
VALUE FORMAT TEXT
INCLUDE KEY AS message_key,
PARTITION,
OFFSET,
TIMESTAMP AS kafka_timestamp,
HEADERS
ENVELOPE NONE;
Exposes the message key as a column
Exposes the Kafka partition number
Exposes the Kafka offset within the partition
Exposes the Kafka message timestamp
Exposes message headers as a list of key-value pairs
Progress Monitoring
Materialize automatically creates a progress subsource:
-- Check Kafka source progress
SELECT partition, "offset"
FROM kafka_source_progress;
-- Check PostgreSQL source progress
SELECT lsn
FROM pg_source_progress;
Examples
Complete PostgreSQL Source
CREATE SECRET pgpass AS 'secret_password';
CREATE CONNECTION pg_conn TO POSTGRES (
HOST 'postgres.example.com',
PORT 5432,
USER 'materialize',
PASSWORD SECRET pgpass,
DATABASE 'production',
SSL MODE 'require'
);
CREATE SOURCE pg_cdc
FROM POSTGRES CONNECTION pg_conn
(PUBLICATION 'mz_publication')
FOR TABLES (users, orders, products);
Complete Kafka Source with Avro
CREATE SECRET kafka_password AS 'password';
CREATE SECRET csr_password AS 'password';
CREATE CONNECTION kafka_conn TO KAFKA (
BROKER 'kafka.example.com:9092',
SASL MECHANISMS = 'PLAIN',
SASL USERNAME = 'user',
SASL PASSWORD = SECRET kafka_password
);
CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL 'https://schema-registry.example.com',
USERNAME = 'user',
PASSWORD = SECRET csr_password
);
CREATE SOURCE orders_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'orders')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
Kafka Source with JSON Parsing
CREATE SOURCE events_raw
FROM KAFKA CONNECTION kafka_conn (TOPIC 'events')
FORMAT JSON;
CREATE VIEW events AS
SELECT
(data->>'event_id')::uuid AS event_id,
(data->>'user_id')::bigint AS user_id,
(data->>'event_type')::text AS event_type,
(data->>'timestamp')::timestamptz AS event_time,
(data->'properties')::jsonb AS properties
FROM events_raw;
Best Practices
- Use Connections: Create reusable connections for credentials and configuration
- Monitor Progress: Query progress subsources to track ingestion
- Parse JSON: Create views to parse JSON into typed columns
- Handle Errors: Use error handling options for Upsert sources
- Set Offsets: Use
START OFFSET or START TIMESTAMP when needed
- Consider Envelopes: Choose the right envelope for your data semantics
Related Pages