Skip to main content

Overview

CREATE SOURCE connects Materialize to external data sources and ingests streaming data. Sources provide the foundation for real-time transformations and materialized views.

Syntax

CREATE SOURCE [IF NOT EXISTS] <source_name>
  FROM { POSTGRES | KAFKA | LOAD GENERATOR | WEBHOOK | ... } 
  CONNECTION <connection_name> 
  (<source_options>)
  [FORMAT <format_spec>]
  [ENVELOPE <envelope_type>]
  [FOR ALL TABLES | FOR TABLES (...) | FOR SCHEMAS (...)];

Supported Source Types

Materialize supports multiple source types for ingesting data:

PostgreSQL Sources

Ingest change data capture (CDC) streams from PostgreSQL databases:
-- Create connection
CREATE SECRET pg_password AS 'password';

CREATE CONNECTION pg_connection TO POSTGRES (
  HOST 'postgres.example.com',
  PORT 5432,
  USER 'materialize',
  PASSWORD SECRET pg_password,
  DATABASE 'mydb',
  SSL MODE 'require'
);

-- Create source
CREATE SOURCE pg_source
  FROM POSTGRES CONNECTION pg_connection 
  (PUBLICATION 'mz_source')
  FOR ALL TABLES;
Key Features:
  • Captures INSERT, UPDATE, and DELETE operations
  • Supports all PostgreSQL 11+ versions
  • Automatically creates subsources for each table
  • Handles schema evolution for compatible changes

Kafka Sources

Ingest streaming data from Apache Kafka or Redpanda:
-- Create connection
CREATE SECRET kafka_password AS 'password';

CREATE CONNECTION kafka_connection TO KAFKA (
  BROKER 'kafka.example.com:9092',
  SASL MECHANISMS = 'SCRAM-SHA-256',
  SASL USERNAME = 'user',
  SASL PASSWORD = SECRET kafka_password
);

-- Create source with Avro format
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
  URL 'https://schema-registry.example.com'
);

CREATE SOURCE kafka_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'orders')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE NONE;
Supported Formats:
  • Avro - with Confluent Schema Registry integration
  • JSON - stored as jsonb type
  • Protobuf - with schema registry or inline schemas
  • Text/Bytes - plain text or raw binary data
  • CSV - comma-separated values
Envelopes:
  • NONE (append-only) - Default, treats all records as inserts
  • UPSERT - Supports inserts, updates, and deletes based on key
  • DEBEZIUM - Decodes Debezium CDC format

Load Generator Sources

Generate test data for development and testing:
CREATE SOURCE auction_data 
  FROM LOAD GENERATOR AUCTION 
  FOR ALL TABLES;

CREATE SOURCE counter_data
  FROM LOAD GENERATOR COUNTER;

Webhook Sources

Receive data via HTTP webhooks:
CREATE SOURCE webhook_source 
  FROM WEBHOOK 
  BODY FORMAT JSON;

Format Specifications

Avro Format

FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  • Automatically determines columns and types from schema
  • Supports schema evolution
  • Integrates with Confluent Schema Registry

JSON Format

FORMAT JSON
  • Creates a single jsonb column named data
  • Recommended to create a parsing view for typed access:
CREATE VIEW parsed_json AS
  SELECT
    (data->>'id')::int AS id,
    (data->>'name')::text AS name,
    (data->>'created_at')::timestamp AS created_at
  FROM json_source;

Protobuf Format

-- Using schema registry
FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection

-- Using inline schema
FORMAT PROTOBUF MESSAGE 'billing.Batch' 
  USING SCHEMA '\x0a300a0d62696...'

Text Format

FORMAT TEXT
  • Creates a single text column
  • Parses new-line delimited data as UTF-8

Bytes Format

FORMAT BYTES
  • Creates a single bytea column
  • Stores raw binary data without decoding

Envelope Types

Append-Only (ENVELOPE NONE)

Default envelope. Treats all records as inserts:
CREATE SOURCE kafka_append
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'events')
  FORMAT JSON
  ENVELOPE NONE;  -- Optional, this is the default

Upsert Envelope

Supports inserts, updates, and deletes using key-value semantics:
CREATE SOURCE kafka_upsert
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'users')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;
  • Records with new keys are inserted
  • Records with existing keys and non-null values are updated
  • Records with existing keys and null values are deleted
  • Required for log-compacted topics

Debezium Envelope

Decodes Debezium change events:
CREATE SOURCE debezium_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'dbserver.public.users')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE DEBEZIUM;
  • Interprets before and after fields
  • Supports INSERT, UPDATE, and DELETE operations
  • Incrementally maintains views based on CDC events

Exposing Metadata

Include message metadata in the source:
CREATE SOURCE kafka_metadata
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'data')
  KEY FORMAT TEXT
  VALUE FORMAT TEXT
  INCLUDE KEY AS message_key,
           PARTITION,
           OFFSET,
           TIMESTAMP AS kafka_timestamp,
           HEADERS
  ENVELOPE NONE;
INCLUDE KEY
column_name
Exposes the message key as a column
INCLUDE PARTITION
integer
Exposes the Kafka partition number
INCLUDE OFFSET
uint8
Exposes the Kafka offset within the partition
INCLUDE TIMESTAMP
timestamp
Exposes the Kafka message timestamp
INCLUDE HEADERS
list
Exposes message headers as a list of key-value pairs

Progress Monitoring

Materialize automatically creates a progress subsource:
-- Check Kafka source progress
SELECT partition, "offset"
FROM kafka_source_progress;

-- Check PostgreSQL source progress
SELECT lsn
FROM pg_source_progress;

Examples

Complete PostgreSQL Source

CREATE SECRET pgpass AS 'secret_password';

CREATE CONNECTION pg_conn TO POSTGRES (
  HOST 'postgres.example.com',
  PORT 5432,
  USER 'materialize',
  PASSWORD SECRET pgpass,
  DATABASE 'production',
  SSL MODE 'require'
);

CREATE SOURCE pg_cdc
  FROM POSTGRES CONNECTION pg_conn 
  (PUBLICATION 'mz_publication')
  FOR TABLES (users, orders, products);

Complete Kafka Source with Avro

CREATE SECRET kafka_password AS 'password';
CREATE SECRET csr_password AS 'password';

CREATE CONNECTION kafka_conn TO KAFKA (
  BROKER 'kafka.example.com:9092',
  SASL MECHANISMS = 'PLAIN',
  SASL USERNAME = 'user',
  SASL PASSWORD = SECRET kafka_password
);

CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  URL 'https://schema-registry.example.com',
  USERNAME = 'user',
  PASSWORD = SECRET csr_password
);

CREATE SOURCE orders_source
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'orders')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

Kafka Source with JSON Parsing

CREATE SOURCE events_raw
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'events')
  FORMAT JSON;

CREATE VIEW events AS
  SELECT
    (data->>'event_id')::uuid AS event_id,
    (data->>'user_id')::bigint AS user_id,
    (data->>'event_type')::text AS event_type,
    (data->>'timestamp')::timestamptz AS event_time,
    (data->'properties')::jsonb AS properties
  FROM events_raw;

Best Practices

  1. Use Connections: Create reusable connections for credentials and configuration
  2. Monitor Progress: Query progress subsources to track ingestion
  3. Parse JSON: Create views to parse JSON into typed columns
  4. Handle Errors: Use error handling options for Upsert sources
  5. Set Offsets: Use START OFFSET or START TIMESTAMP when needed
  6. Consider Envelopes: Choose the right envelope for your data semantics

Build docs developers (and LLMs) love