Schemas provide type safety and enable schema evolution in Apache Pulsar. They define the structure of messages and ensure compatibility between producers and consumers.
Schema Basics
From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java, schemas handle message serialization and deserialization:
/**
* Message schema definition.
*/
public interface Schema < T > extends Cloneable {
/**
* Encode an object representing the message content into a byte array.
*/
byte [] encode ( T message );
/**
* Decode a byte array into an object using the schema definition.
*/
T decode ( byte [] bytes );
/**
* @return schema information
*/
SchemaInfo getSchemaInfo ();
}
Schemas are optional but highly recommended for production use. They provide type safety, automatic serialization, and enable schema evolution.
Primitive Schemas
Pulsar provides built-in schemas for primitive types:
// String schema (UTF-8 encoding)
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. create ();
// Byte array (no serialization)
Producer < byte []> producer = client . newProducer ( Schema . BYTES )
. topic ( "my-topic" )
. create ();
// ByteBuffer
Producer < ByteBuffer > producer = client . newProducer ( Schema . BYTEBUFFER )
. topic ( "my-topic" )
. create ();
Numeric Types
From the Schema interface:
Schema < Byte > INT8 = Schema . INT8 ;
Schema < Short > INT16 = Schema . INT16 ;
Schema < Integer > INT32 = Schema . INT32 ;
Schema < Long > INT64 = Schema . INT64 ;
Schema < Float > FLOAT = Schema . FLOAT ;
Schema < Double > DOUBLE = Schema . DOUBLE ;
Schema < Boolean > BOOL = Schema . BOOL ;
Date and Time Types
Schema < Date > DATE = Schema . DATE ;
Schema < Time > TIME = Schema . TIME ;
Schema < Timestamp > TIMESTAMP = Schema . TIMESTAMP ;
Schema < Instant > INSTANT = Schema . INSTANT ;
Schema < LocalDate > LOCAL_DATE = Schema . LOCAL_DATE ;
Schema < LocalTime > LOCAL_TIME = Schema . LOCAL_TIME ;
Schema < LocalDateTime > LOCAL_DATE_TIME = Schema . LOCAL_DATE_TIME ;
Complex Schemas
JSON Schema
Automatic JSON serialization for POJOs:
// Define a POJO
public class User {
private String name ;
private int age ;
private String email ;
// Constructors, getters, setters
}
// Create producer with JSON schema
Producer < User > producer = client . newProducer ( Schema . JSON ( User . class ))
. topic ( "user-topic" )
. create ();
// Send typed messages
User user = new User ( "Alice" , 30 , "[email protected] " );
producer . send (user);
// Consume with same schema
Consumer < User > consumer = client . newConsumer ( Schema . JSON ( User . class ))
. topic ( "user-topic" )
. subscriptionName ( "my-sub" )
. subscribe ();
Message < User > msg = consumer . receive ();
User receivedUser = msg . getValue (); // Automatically deserialized
Avro Schema
From Schema.java - Avro provides efficient binary serialization:
// Define a POJO (no Avro annotations needed)
public class Order {
private String orderId ;
private double amount ;
private List < String > items ;
// Constructors, getters, setters
}
// Create producer with Avro schema
Producer < Order > producer = client . newProducer ( Schema . AVRO ( Order . class ))
. topic ( "orders" )
. create ();
// Schema is automatically registered
Order order = new Order ( "ORDER-123" , 99.99 , Arrays . asList ( "item1" , "item2" ));
producer . send (order);
Avro schemas are more compact than JSON and support richer schema evolution semantics.
Protobuf Schema
For Google Protocol Buffers:
// From Schema.java
static < T extends com . google . protobuf . Message > Schema < T > PROTOBUF ( Class < T > clazz)
// Define protobuf message (in .proto file)
message UserEvent {
string user_id = 1 ;
string event_type = 2 ;
int64 timestamp = 3 ;
}
// Generate Java class using protoc
// Then use in Pulsar:
Producer < UserEvent > producer = client . newProducer (
Schema . PROTOBUF ( UserEvent . class ))
. topic ( "user-events" )
. create ();
KeyValue Schema
For messages with typed keys and values:
// From Schema.java
static < K, V > Schema < KeyValue < K, V >> KeyValue (
Schema < K > key, Schema < V > value, KeyValueEncodingType encodingType)
// Create KeyValue schema
Schema < KeyValue < String , Order >> kvSchema = Schema . KeyValue (
Schema . STRING ,
Schema . JSON ( Order . class ),
KeyValueEncodingType . SEPARATED // or INLINE
);
Producer < KeyValue < String , Order >> producer = client . newProducer (kvSchema)
. topic ( "orders" )
. create ();
// Send with typed key and value
KeyValue < String , Order > kv = new KeyValue <>( "user-123" , order);
producer . send (kv);
Encoding types:
SEPARATED : Key and value stored separately (supports key-based routing)
INLINE : Key and value encoded together
Schema Registry
Pulsar automatically manages schemas in a built-in schema registry:
# View schema for a topic
pulsar-admin schemas get persistent://tenant/ns/topic
# Upload schema manually
pulsar-admin schemas upload persistent://tenant/ns/topic \
--filename schema.json
# Delete schema
pulsar-admin schemas delete persistent://tenant/ns/topic
Schema information includes:
Schema type (JSON, Avro, Protobuf, etc.)
Schema definition
Schema version
Compatibility settings
Schema Evolution
Pulsar supports schema evolution with compatibility checks:
Compatibility Modes
# Set schema compatibility at namespace level
pulsar-admin namespaces set-schema-compatibility-strategy \
tenant/namespace \
--compatibility BACKWARD
Compatibility strategies:
ALWAYS_COMPATIBLE : No compatibility checks (default)
BACKWARD : New schema can read data written with old schema
FORWARD : Old schema can read data written with new schema
FULL : Both backward and forward compatible
BACKWARD_TRANSITIVE : Backward compatible with all previous versions
FORWARD_TRANSITIVE : Forward compatible with all previous versions
FULL_TRANSITIVE : Both backward and forward compatible with all versions
Schema Versioning
From Schema.java, schemas support versioning:
public interface Schema < T > {
/**
* Returns whether this schema supports versioning.
*/
default boolean supportSchemaVersioning () {
return false ;
}
/**
* Decode a byte array using a given schema version.
*/
default T decode ( byte [] bytes , byte [] schemaVersion ) {
return decode (bytes);
}
}
Schemas are versioned automatically:
// V1: Original schema
public class User {
private String name ;
private int age ;
}
// V2: Add optional field (backward compatible)
public class User {
private String name ;
private int age ;
private String email ; // New field with default null
}
// Consumers can read both versions
Consumer < User > consumer = client . newConsumer ( Schema . JSON ( User . class ))
. topic ( "users" )
. subscribe ();
When a producer sends a message, Pulsar includes the schema version in the message metadata. Consumers automatically use the correct schema version to deserialize.
Auto Schema
For flexible consumption without knowing the schema in advance:
// From Schema.java
static Schema < GenericRecord > AUTO_CONSUME ()
// Consumer that automatically adapts to topic schema
Consumer < GenericRecord > consumer = client . newConsumer ( Schema . AUTO_CONSUME ())
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. subscribe ();
Message < GenericRecord > msg = consumer . receive ();
GenericRecord record = msg . getValue ();
// Access fields dynamically
String name = (String) record . getField ( "name" );
Integer age = (Integer) record . getField ( "age" );
Auto Produce
For producing raw payloads with schema validation:
// From Schema.java
static Schema < byte [] > AUTO_PRODUCE_BYTES ()
static Schema < byte [] > AUTO_PRODUCE_BYTES ( Schema < ? > schema)
// Produce pre-serialized data
Producer < byte []> producer = client . newProducer (
Schema . AUTO_PRODUCE_BYTES ( Schema . JSON ( User . class )))
. topic ( "users" )
. create ();
// Send pre-serialized JSON
byte [] jsonBytes = "{ \" name \" : \" Alice \" , \" age \" :30}" . getBytes ();
producer . send (jsonBytes);
Schema Definition Builder
For advanced schema configuration:
import org.apache.pulsar.client.api.schema.SchemaDefinition;
SchemaDefinition < User > schemaDefinition = SchemaDefinition. < User > builder ()
. withPojo ( User . class )
. withAlwaysAllowNull ( false )
. withJSR310ConversionEnabled ( true )
. withProperties ( Map . of (
"namespace" , "com.example.users" ,
"version" , "1.0"
))
. build ();
Producer < User > producer = client . newProducer (
Schema . JSON (schemaDefinition))
. topic ( "users" )
. create ();
Schema Validation
From Schema.java:
public interface Schema < T > {
/**
* Check if the message is a valid object for this schema.
*/
default void validate ( byte [] message ) {
decode (message); // Validation by attempting decode
}
}
Validation happens automatically:
When producer sends a message
When consumer receives a message
Can be customized via schema configuration
Native Schema
For using existing Avro schemas:
// From Schema.java
static Schema < byte [] > NATIVE_AVRO ( Object schema)
// Use existing Avro schema
org . apache . avro . Schema avroSchema =
new org. apache . avro . Schema . Parser (). parse (schemaJson);
Producer < byte []> producer = client . newProducer (
Schema . NATIVE_AVRO (avroSchema))
. topic ( "my-topic" )
. create ();
Schema Operations
Get Schema Info
Message < User > msg = consumer . receive ();
Optional < Schema < ? >> schema = msg . getReaderSchema ();
if ( schema . isPresent ()) {
SchemaInfo info = schema . get (). getSchemaInfo ();
System . out . println ( "Schema type: " + info . getType ());
System . out . println ( "Schema name: " + info . getName ());
System . out . println ( "Schema version: " +
new String ( msg . getSchemaVersion ()));
}
Schema Compatibility Check
# Check if new schema is compatible
pulsar-admin schemas compatibility-check \
persistent://tenant/ns/topic \
--filename new-schema.json
Best Practices
Use primitive schemas (STRING, BYTES) for simple use cases
Use JSON for ease of development and debugging
Use Avro for production with schema evolution requirements
Use Protobuf when interoperating with gRPC systems
Use KeyValue schema when you need typed message keys
Set appropriate compatibility mode before production
Add optional fields (with defaults) for backward compatibility
Avoid removing or renaming fields
Test schema changes with compatibility-check command
Document schema changes in your version control
Enable schema validation in production
Use namespace-level compatibility settings
Version your POJO classes alongside schema versions
Monitor schema registry for unexpected changes
Back up schema definitions
Common Patterns
Multi-Version Consumers
// Consumer handles multiple schema versions automatically
Consumer < User > consumer = client . newConsumer ( Schema . JSON ( User . class ))
. topic ( "users" )
. subscriptionName ( "my-sub" )
. subscribe ();
// Receives messages from both V1 and V2 producers
while ( true ) {
Message < User > msg = consumer . receive ();
User user = msg . getValue ();
// Fields from older versions will be null/default
processUser (user);
consumer . acknowledge (msg);
}
Schema-less to Typed Migration
// Step 1: Start with schema-less
Producer < byte []> producer = client . newProducer ( Schema . BYTES )
. topic ( "my-topic" )
. create ();
// Step 2: Migrate to typed schema
Producer < User > typedProducer = client . newProducer ( Schema . JSON ( User . class ))
. topic ( "my-topic" )
. create ();
// Step 3: Update consumers
Consumer < User > consumer = client . newConsumer ( Schema . JSON ( User . class ))
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. subscribe ();
Next Steps
Producers & Consumers Learn about client implementations
Messaging Understand message structure
Topics Explore topic management
Schema Registry Administer schema registry