Skip to main content

Overview

The Schema interface defines how messages are serialized and deserialized in Pulsar. It provides type-safe message encoding and decoding, enabling producers and consumers to work with structured data types instead of raw bytes.

Core Methods

encode()

Encode an object representing the message content into a byte array.
byte[] encode(T message)
message
T
The message object
Returns
byte[]
A byte array with the serialized content
Throws: SchemaSerializationException if the serialization fails

encode(topic, message)

Encode a message for a specific topic.
default EncodeData encode(String topic, T message)
topic
String
The topic name
message
T
The message object
Returns
EncodeData
Encoded data wrapper

decode()

Decode a byte array into an object using the schema definition and deserializer implementation.
default T decode(byte[] bytes)
bytes
byte[]
The byte array to decode
Returns
T
The deserialized object

decode(bytes, schemaVersion)

Decode a byte array into an object using a given version.
default T decode(byte[] bytes, byte[] schemaVersion)
bytes
byte[]
The byte array to decode
schemaVersion
byte[]
The schema version to decode the object (null indicates using latest version)
Returns
T
The deserialized object

decode(ByteBuffer)

Decode a ByteBuffer into an object.
default T decode(ByteBuffer data)
data
ByteBuffer
The ByteBuffer to decode
Returns
T
The deserialized object

decode(ByteBuffer, schemaVersion)

Decode a ByteBuffer into an object using a given version.
default T decode(ByteBuffer data, byte[] schemaVersion)
data
ByteBuffer
The ByteBuffer to decode
schemaVersion
byte[]
The schema version to decode the object
Returns
T
The deserialized object

validate()

Check if the message is a valid object for this schema.
default void validate(byte[] message)
message
byte[]
The message to verify
Throws: SchemaSerializationException if it is not a valid message The implementation can choose what its most efficient approach to validate the schema. If the implementation doesn’t provide it, it will attempt to use decode(byte[]) to see if this schema can decode this message.

getSchemaInfo()

Get an object that represents the Schema associated metadata.
SchemaInfo getSchemaInfo()
Returns
SchemaInfo
The schema information object

clone()

Duplicate the schema.
Schema<T> clone()
Returns
Schema<T>
The duplicated schema

Built-in Primitive Schemas

BYTES

Schema that doesn’t perform any encoding on the message payloads. Accepts a byte array and passes it through.
Schema<byte[]> BYTES
Example:
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
    .topic("my-topic")
    .create();

producer.send("Hello".getBytes());

BYTEBUFFER

ByteBuffer Schema.
Schema<ByteBuffer> BYTEBUFFER

STRING

Schema that can be used to encode/decode messages whose values are String. The payload is encoded with UTF-8.
Schema<String> STRING
Example:
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();

producer.send("Hello Pulsar");

INT8

INT8 Schema.
Schema<Byte> INT8

INT16

INT16 Schema.
Schema<Short> INT16

INT32

INT32 Schema.
Schema<Integer> INT32
Example:
Producer<Integer> producer = client.newProducer(Schema.INT32)
    .topic("my-numbers")
    .create();

producer.send(42);

INT64

INT64 Schema.
Schema<Long> INT64

BOOL

Boolean Schema.
Schema<Boolean> BOOL

FLOAT

Float Schema.
Schema<Float> FLOAT

DOUBLE

Double Schema.
Schema<Double> DOUBLE

DATE

Date Schema.
Schema<Date> DATE

TIME

Time Schema.
Schema<Time> TIME

TIMESTAMP

Timestamp Schema.
Schema<Timestamp> TIMESTAMP

INSTANT

Instant Schema.
Schema<Instant> INSTANT

LOCAL_DATE

LocalDate Schema.
Schema<LocalDate> LOCAL_DATE

LOCAL_TIME

LocalTime Schema.
Schema<LocalTime> LOCAL_TIME

LOCAL_DATE_TIME

LocalDateTime Schema.
Schema<LocalDateTime> LOCAL_DATE_TIME

Structured Schemas

AVRO(Class)

Create an Avro schema type by default configuration of the class.
static <T> Schema<T> AVRO(Class<T> pojo)
pojo
Class<T>
The POJO class to be used to extract the Avro schema
Returns
Schema<T>
A Schema instance
Example:
public class User {
    public String name;
    public int age;
}

Producer<User> producer = client.newProducer(Schema.AVRO(User.class))
    .topic("users")
    .create();

User user = new User();
user.name = "John";
user.age = 30;
producer.send(user);

AVRO(SchemaDefinition)

Create an Avro schema type with schema definition.
static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition)
schemaDefinition
SchemaDefinition<T>
The definition of the schema
Returns
Schema<T>
A Schema instance

JSON(Class)

Create a JSON schema type by extracting the fields of the specified class.
static <T> Schema<T> JSON(Class<T> pojo)
pojo
Class<T>
The POJO class to be used to extract the JSON schema
Returns
Schema<T>
A Schema instance
Example:
Producer<User> producer = client.newProducer(Schema.JSON(User.class))
    .topic("users-json")
    .create();

producer.send(new User("Alice", 25));

JSON(SchemaDefinition)

Create a JSON schema type with schema definition.
static <T> Schema<T> JSON(SchemaDefinition schemaDefinition)
schemaDefinition
SchemaDefinition
The definition of the schema
Returns
Schema<T>
A Schema instance

PROTOBUF(Class)

Create a Protobuf schema type by extracting the fields of the specified class.
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF(Class<T> clazz)
clazz
Class<T>
The Protobuf generated class to be used to extract the schema
Returns
Schema<T>
A Schema instance
Example:
// Assuming you have a protobuf generated class MyMessage
Producer<MyMessage> producer = client.newProducer(Schema.PROTOBUF(MyMessage.class))
    .topic("protobuf-topic")
    .create();

MyMessage msg = MyMessage.newBuilder()
    .setName("test")
    .setValue(123)
    .build();
producer.send(msg);

PROTOBUF(SchemaDefinition)

Create a Protobuf schema type with schema definition.
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition)

PROTOBUF_NATIVE(Class)

Create a Protobuf-Native schema type by extracting the fields of the specified class.
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF_NATIVE(Class<T> clazz)

PROTOBUF_NATIVE(SchemaDefinition)

Create a Protobuf-Native schema type with schema definition.
static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF_NATIVE(SchemaDefinition<T> schemaDefinition)

KeyValue Schemas

KeyValue(Class, Class, SchemaType)

Key Value Schema using passed in schema type.
static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value, SchemaType type)
key
Class<K>
The key class
value
Class<V>
The value class
type
SchemaType
The schema type (supports JSON and AVRO)
Returns
Schema<KeyValue<K, V>>
A KeyValue Schema instance
Example:
Schema<KeyValue<String, Integer>> kvSchema = 
    Schema.KeyValue(String.class, Integer.class, SchemaType.JSON);

Producer<KeyValue<String, Integer>> producer = client.newProducer(kvSchema)
    .topic("kv-topic")
    .create();

producer.send(new KeyValue<>("user-123", 100));

KeyValue(Class, Class)

Key Value Schema whose underneath key and value schemas are JSONSchema.
static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value)

KeyValue(Schema, Schema)

Key Value Schema using passed in key and value schemas with INLINE encoding type.
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value)
key
Schema<K>
The key schema
value
Schema<V>
The value schema
Returns
Schema<KeyValue<K, V>>
A KeyValue Schema instance

KeyValue(Schema, Schema, KeyValueEncodingType)

Key Value Schema using passed in key, value and encoding type schemas.
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value, 
    KeyValueEncodingType keyValueEncodingType)
key
Schema<K>
The key schema
value
Schema<V>
The value schema
keyValueEncodingType
KeyValueEncodingType
The encoding type (INLINE or SEPARATED)
Returns
Schema<KeyValue<K, V>>
A KeyValue Schema instance
Example:
Schema<KeyValue<String, User>> kvSchema = Schema.KeyValue(
    Schema.STRING,
    Schema.JSON(User.class),
    KeyValueEncodingType.SEPARATED
);

KV_BYTES()

Schema that can be used to encode/decode KeyValue with byte arrays.
static Schema<KeyValue<byte[], byte[]>> KV_BYTES()
Returns
Schema<KeyValue<byte[], byte[]>>
A KeyValue bytes Schema instance

Auto Schema

AUTO_CONSUME()

Create a schema instance that automatically deserializes messages based on the current topic schema.
static Schema<GenericRecord> AUTO_CONSUME()
Returns
Schema<GenericRecord>
The auto schema instance
The message values are deserialized into a GenericRecord object, that extends the GenericObject interface. Example:
Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME())
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscribe();

Message<GenericRecord> msg = consumer.receive();
GenericRecord record = msg.getValue();
String name = (String) record.getField("name");
Integer age = (Integer) record.getField("age");

AUTO_PRODUCE_BYTES()

Create a schema instance that accepts a serialized payload and validates it against the topic schema.
static Schema<byte[]> AUTO_PRODUCE_BYTES()
Returns
Schema<byte[]>
The auto schema instance
Currently this is only supported with Avro and JSON schema types. This method can be used when publishing a raw JSON payload, for which the format is known and a POJO class is not available.

AUTO_PRODUCE_BYTES(Schema)

Create a schema instance that accepts a serialized payload and validates it against the schema specified.
static Schema<byte[]> AUTO_PRODUCE_BYTES(Schema<?> schema)
schema
Schema<?>
The schema to validate against
Returns
Schema<byte[]>
The auto schema instance

NATIVE_AVRO()

Create a schema instance that accepts a serialized Avro payload without validating it against the schema specified.
static Schema<byte[]> NATIVE_AVRO(Object schema)
schema
Object
The Avro schema
Returns
Schema<byte[]>
The auto schema instance
It can be useful when migrating data from existing event or message stores.

Utility Methods

getSchema()

Get a schema instance from SchemaInfo.
static Schema<?> getSchema(SchemaInfo schemaInfo)
schemaInfo
SchemaInfo
The schema information
Returns
Schema<?>
A schema instance

generic()

Returns a generic schema of existing schema info.
static GenericSchema<GenericRecord> generic(SchemaInfo schemaInfo)
schemaInfo
SchemaInfo
Schema info
Returns
GenericSchema<GenericRecord>
A generic schema instance
Only supports AVRO and JSON.

getNativeSchema()

Return the native schema that is wrapped by Pulsar API.
default Optional<Object> getNativeSchema()
Returns
Optional<Object>
The internal schema or null if not present
For instance with this method you can access the Avro schema.

Complete Example

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;

public class SchemaExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650")
            .build();

        // Example 1: Using JSON Schema with POJO
        Producer<User> jsonProducer = client.newProducer(Schema.JSON(User.class))
            .topic("users-json")
            .create();

        jsonProducer.send(new User("Alice", 25));

        // Example 2: Using AVRO Schema
        Producer<User> avroProducer = client.newProducer(Schema.AVRO(User.class))
            .topic("users-avro")
            .create();

        avroProducer.send(new User("Bob", 30));

        // Example 3: Using AUTO_CONSUME to read any schema
        Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME())
            .topic("users-json")
            .subscriptionName("my-sub")
            .subscribe();

        Message<GenericRecord> msg = consumer.receive();
        GenericRecord record = msg.getValue();
        System.out.println("Name: " + record.getField("name"));
        System.out.println("Age: " + record.getField("age"));

        // Example 4: Using KeyValue Schema
        Schema<KeyValue<String, User>> kvSchema = 
            Schema.KeyValue(Schema.STRING, Schema.JSON(User.class));

        Producer<KeyValue<String, User>> kvProducer = 
            client.newProducer(kvSchema)
                .topic("users-kv")
                .create();

        kvProducer.send(new KeyValue<>("user-1", new User("Charlie", 35)));

        // Clean up
        jsonProducer.close();
        avroProducer.close();
        kvProducer.close();
        consumer.close();
        client.close();
    }

    static class User {
        public String name;
        public int age;

        public User() {}

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }
    }
}

Build docs developers (and LLMs) love