Skip to main content
Schemas provide type safety and enable schema evolution in Apache Pulsar. They define the structure of messages and ensure compatibility between producers and consumers.

Schema Basics

From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java, schemas handle message serialization and deserialization:
/**
 * Message schema definition.
 */
public interface Schema<T> extends Cloneable {
    /**
     * Encode an object representing the message content into a byte array.
     */
    byte[] encode(T message);
    
    /**
     * Decode a byte array into an object using the schema definition.
     */
    T decode(byte[] bytes);
    
    /**
     * @return schema information
     */
    SchemaInfo getSchemaInfo();
}
Schemas are optional but highly recommended for production use. They provide type safety, automatic serialization, and enable schema evolution.

Primitive Schemas

Pulsar provides built-in schemas for primitive types:
// String schema (UTF-8 encoding)
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();

// Byte array (no serialization)
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
    .topic("my-topic")
    .create();

// ByteBuffer
Producer<ByteBuffer> producer = client.newProducer(Schema.BYTEBUFFER)
    .topic("my-topic")
    .create();

Numeric Types

From the Schema interface:
Schema<Byte> INT8 = Schema.INT8;
Schema<Short> INT16 = Schema.INT16;
Schema<Integer> INT32 = Schema.INT32;
Schema<Long> INT64 = Schema.INT64;
Schema<Float> FLOAT = Schema.FLOAT;
Schema<Double> DOUBLE = Schema.DOUBLE;
Schema<Boolean> BOOL = Schema.BOOL;

Date and Time Types

Schema<Date> DATE = Schema.DATE;
Schema<Time> TIME = Schema.TIME;
Schema<Timestamp> TIMESTAMP = Schema.TIMESTAMP;
Schema<Instant> INSTANT = Schema.INSTANT;
Schema<LocalDate> LOCAL_DATE = Schema.LOCAL_DATE;
Schema<LocalTime> LOCAL_TIME = Schema.LOCAL_TIME;
Schema<LocalDateTime> LOCAL_DATE_TIME = Schema.LOCAL_DATE_TIME;

Complex Schemas

JSON Schema

Automatic JSON serialization for POJOs:
// Define a POJO
public class User {
    private String name;
    private int age;
    private String email;
    
    // Constructors, getters, setters
}

// Create producer with JSON schema
Producer<User> producer = client.newProducer(Schema.JSON(User.class))
    .topic("user-topic")
    .create();

// Send typed messages
User user = new User("Alice", 30, "[email protected]");
producer.send(user);

// Consume with same schema
Consumer<User> consumer = client.newConsumer(Schema.JSON(User.class))
    .topic("user-topic")
    .subscriptionName("my-sub")
    .subscribe();

Message<User> msg = consumer.receive();
User receivedUser = msg.getValue();  // Automatically deserialized

Avro Schema

From Schema.java - Avro provides efficient binary serialization:
// Define a POJO (no Avro annotations needed)
public class Order {
    private String orderId;
    private double amount;
    private List<String> items;
    
    // Constructors, getters, setters
}

// Create producer with Avro schema
Producer<Order> producer = client.newProducer(Schema.AVRO(Order.class))
    .topic("orders")
    .create();

// Schema is automatically registered
Order order = new Order("ORDER-123", 99.99, Arrays.asList("item1", "item2"));
producer.send(order);
Avro schemas are more compact than JSON and support richer schema evolution semantics.

Protobuf Schema

For Google Protocol Buffers:
// From Schema.java
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF(Class<T> clazz)

// Define protobuf message (in .proto file)
message UserEvent {
    string user_id = 1;
    string event_type = 2;
    int64 timestamp = 3;
}

// Generate Java class using protoc
// Then use in Pulsar:
Producer<UserEvent> producer = client.newProducer(
    Schema.PROTOBUF(UserEvent.class))
    .topic("user-events")
    .create();

KeyValue Schema

For messages with typed keys and values:
// From Schema.java
static <K, V> Schema<KeyValue<K, V>> KeyValue(
    Schema<K> key, Schema<V> value, KeyValueEncodingType encodingType)

// Create KeyValue schema
Schema<KeyValue<String, Order>> kvSchema = Schema.KeyValue(
    Schema.STRING,
    Schema.JSON(Order.class),
    KeyValueEncodingType.SEPARATED  // or INLINE
);

Producer<KeyValue<String, Order>> producer = client.newProducer(kvSchema)
    .topic("orders")
    .create();

// Send with typed key and value
KeyValue<String, Order> kv = new KeyValue<>("user-123", order);
producer.send(kv);
Encoding types:
  • SEPARATED: Key and value stored separately (supports key-based routing)
  • INLINE: Key and value encoded together

Schema Registry

Pulsar automatically manages schemas in a built-in schema registry:
# View schema for a topic
pulsar-admin schemas get persistent://tenant/ns/topic

# Upload schema manually
pulsar-admin schemas upload persistent://tenant/ns/topic \
  --filename schema.json

# Delete schema
pulsar-admin schemas delete persistent://tenant/ns/topic
Schema information includes:
  • Schema type (JSON, Avro, Protobuf, etc.)
  • Schema definition
  • Schema version
  • Compatibility settings

Schema Evolution

Pulsar supports schema evolution with compatibility checks:

Compatibility Modes

# Set schema compatibility at namespace level
pulsar-admin namespaces set-schema-compatibility-strategy \
  tenant/namespace \
  --compatibility BACKWARD
Compatibility strategies:
  • ALWAYS_COMPATIBLE: No compatibility checks (default)
  • BACKWARD: New schema can read data written with old schema
  • FORWARD: Old schema can read data written with new schema
  • FULL: Both backward and forward compatible
  • BACKWARD_TRANSITIVE: Backward compatible with all previous versions
  • FORWARD_TRANSITIVE: Forward compatible with all previous versions
  • FULL_TRANSITIVE: Both backward and forward compatible with all versions

Schema Versioning

From Schema.java, schemas support versioning:
public interface Schema<T> {
    /**
     * Returns whether this schema supports versioning.
     */
    default boolean supportSchemaVersioning() {
        return false;
    }
    
    /**
     * Decode a byte array using a given schema version.
     */
    default T decode(byte[] bytes, byte[] schemaVersion) {
        return decode(bytes);
    }
}
Schemas are versioned automatically:
// V1: Original schema
public class User {
    private String name;
    private int age;
}

// V2: Add optional field (backward compatible)
public class User {
    private String name;
    private int age;
    private String email;  // New field with default null
}

// Consumers can read both versions
Consumer<User> consumer = client.newConsumer(Schema.JSON(User.class))
    .topic("users")
    .subscribe();
When a producer sends a message, Pulsar includes the schema version in the message metadata. Consumers automatically use the correct schema version to deserialize.

Auto Schema

For flexible consumption without knowing the schema in advance:
// From Schema.java
static Schema<GenericRecord> AUTO_CONSUME()

// Consumer that automatically adapts to topic schema
Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME())
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscribe();

Message<GenericRecord> msg = consumer.receive();
GenericRecord record = msg.getValue();

// Access fields dynamically
String name = (String) record.getField("name");
Integer age = (Integer) record.getField("age");

Auto Produce

For producing raw payloads with schema validation:
// From Schema.java
static Schema<byte[]> AUTO_PRODUCE_BYTES()
static Schema<byte[]> AUTO_PRODUCE_BYTES(Schema<?> schema)

// Produce pre-serialized data
Producer<byte[]> producer = client.newProducer(
    Schema.AUTO_PRODUCE_BYTES(Schema.JSON(User.class)))
    .topic("users")
    .create();

// Send pre-serialized JSON
byte[] jsonBytes = "{\"name\":\"Alice\",\"age\":30}".getBytes();
producer.send(jsonBytes);

Schema Definition Builder

For advanced schema configuration:
import org.apache.pulsar.client.api.schema.SchemaDefinition;

SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder()
    .withPojo(User.class)
    .withAlwaysAllowNull(false)
    .withJSR310ConversionEnabled(true)
    .withProperties(Map.of(
        "namespace", "com.example.users",
        "version", "1.0"
    ))
    .build();

Producer<User> producer = client.newProducer(
    Schema.JSON(schemaDefinition))
    .topic("users")
    .create();

Schema Validation

From Schema.java:
public interface Schema<T> {
    /**
     * Check if the message is a valid object for this schema.
     */
    default void validate(byte[] message) {
        decode(message);  // Validation by attempting decode
    }
}
Validation happens automatically:
  • When producer sends a message
  • When consumer receives a message
  • Can be customized via schema configuration

Native Schema

For using existing Avro schemas:
// From Schema.java
static Schema<byte[]> NATIVE_AVRO(Object schema)

// Use existing Avro schema
org.apache.avro.Schema avroSchema = 
    new org.apache.avro.Schema.Parser().parse(schemaJson);

Producer<byte[]> producer = client.newProducer(
    Schema.NATIVE_AVRO(avroSchema))
    .topic("my-topic")
    .create();

Schema Operations

Get Schema Info

Message<User> msg = consumer.receive();

Optional<Schema<?>> schema = msg.getReaderSchema();
if (schema.isPresent()) {
    SchemaInfo info = schema.get().getSchemaInfo();
    System.out.println("Schema type: " + info.getType());
    System.out.println("Schema name: " + info.getName());
    System.out.println("Schema version: " + 
        new String(msg.getSchemaVersion()));
}

Schema Compatibility Check

# Check if new schema is compatible
pulsar-admin schemas compatibility-check \
  persistent://tenant/ns/topic \
  --filename new-schema.json

Best Practices

  • Use primitive schemas (STRING, BYTES) for simple use cases
  • Use JSON for ease of development and debugging
  • Use Avro for production with schema evolution requirements
  • Use Protobuf when interoperating with gRPC systems
  • Use KeyValue schema when you need typed message keys
  • Set appropriate compatibility mode before production
  • Add optional fields (with defaults) for backward compatibility
  • Avoid removing or renaming fields
  • Test schema changes with compatibility-check command
  • Document schema changes in your version control
  • Enable schema validation in production
  • Use namespace-level compatibility settings
  • Version your POJO classes alongside schema versions
  • Monitor schema registry for unexpected changes
  • Back up schema definitions
  • Avro is more efficient than JSON for large messages
  • Protobuf is most compact for wire format
  • Schema validation adds minimal overhead
  • Reuse schema instances across producers/consumers

Common Patterns

Multi-Version Consumers

// Consumer handles multiple schema versions automatically
Consumer<User> consumer = client.newConsumer(Schema.JSON(User.class))
    .topic("users")
    .subscriptionName("my-sub")
    .subscribe();

// Receives messages from both V1 and V2 producers
while (true) {
    Message<User> msg = consumer.receive();
    User user = msg.getValue();
    // Fields from older versions will be null/default
    processUser(user);
    consumer.acknowledge(msg);
}

Schema-less to Typed Migration

// Step 1: Start with schema-less
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
    .topic("my-topic")
    .create();

// Step 2: Migrate to typed schema
Producer<User> typedProducer = client.newProducer(Schema.JSON(User.class))
    .topic("my-topic")
    .create();

// Step 3: Update consumers
Consumer<User> consumer = client.newConsumer(Schema.JSON(User.class))
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscribe();

Next Steps

Producers & Consumers

Learn about client implementations

Messaging

Understand message structure

Topics

Explore topic management

Schema Registry

Administer schema registry

Build docs developers (and LLMs) love