Overview
The Message interface is the message abstraction used in Pulsar. It represents a message received from a topic and provides access to the message’s payload, properties, metadata, and identifiers.
Message Properties
getProperties()
Return the properties attached to the message.
Map<String, String> getProperties()
An unmodifiable view of the properties map
Properties are application defined key/value pairs that will be attached to the message.
Example:
Message<String> msg = consumer.receive();
Map<String, String> props = msg.getProperties();
for (Map.Entry<String, String> entry : props.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
hasProperty()
Check whether the message has a specific property attached.
boolean hasProperty(String name)
The name of the property to check
True if the message has the specified property, false if the property is not defined
Example:
if (msg.hasProperty("user-id")) {
String userId = msg.getProperty("user-id");
System.out.println("User ID: " + userId);
}
getProperty()
Get the value of a specific property.
String getProperty(String name)
The value of the property or null if the property was not defined
Message Payload
getData()
Get the raw payload of the message.
The byte array with the message payload
Even when using the Schema and type-safe API, an application has access to the underlying raw message payload.
Example:
Message<String> msg = consumer.receive();
byte[] rawData = msg.getData();
String content = new String(rawData);
getValue()
Get the de-serialized value of the message, according to the configured Schema.
The deserialized value of the message
Example:
Consumer<MyObject> consumer = client.newConsumer(Schema.AVRO(MyObject.class))
.topic("my-topic")
.subscribe();
Message<MyObject> msg = consumer.receive();
MyObject obj = msg.getValue();
size()
Get the uncompressed message payload size in bytes.
Message Identifiers
getMessageId()
Get the unique message ID associated with this message.
The message id (null if this message was not received by this client instance)
The message id can be used to univocally refer to a message without having to keep the entire payload in memory. Only messages received from the consumer will have a message id assigned.
Example:
Message<String> msg = consumer.receive();
MessageId msgId = msg.getMessageId();
System.out.println("Message ID: " + msgId);
// Later, seek to this message
consumer.seek(msgId);
getTopicName()
Get the topic the message was published to.
The topic the message was published to
Message Keys
hasKey()
Check whether the message has a key.
True if the key was set while creating the message, false otherwise
getKey()
Get the key of the message.
Example:
if (msg.hasKey()) {
String key = msg.getKey();
System.out.println("Message key: " + key);
}
hasBase64EncodedKey()
Check whether the key has been base64 encoded.
boolean hasBase64EncodedKey()
True if the key is base64 encoded, false otherwise
getKeyBytes()
Get bytes in key.
If the key has been base64 encoded, it is decoded before being returned. Otherwise, if the key is a plain string, this method returns the UTF_8 encoded bytes of the string.
hasOrderingKey()
Check whether the message has an ordering key.
True if the ordering key was set while creating the message, false otherwise
getOrderingKey()
Get the ordering key of the message.
The ordering key of the message
Timestamps
getPublishTime()
Get the publish time of this message.
Publish time of this message
The publish time is the timestamp that a client published the message.
Example:
long publishTime = msg.getPublishTime();
System.out.println("Published at: " + new Date(publishTime));
getEventTime()
Get the event time associated with this message.
The message event time or 0 if event time wasn’t set
It is typically set by the applications via TypedMessageBuilder.eventTime(long). If there isn’t any event time associated with this event, it will return 0.
Example:
long eventTime = msg.getEventTime();
if (eventTime > 0) {
System.out.println("Event time: " + new Date(eventTime));
}
hasBrokerPublishTime()
Check whether the message has a broker publish time.
boolean hasBrokerPublishTime()
True if the message has a broker publish time, otherwise false
getBrokerPublishTime()
Get broker publish time from broker entry metadata.
Optional<Long> getBrokerPublishTime()
Broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker
Note that only if the feature is enabled in the broker then the value is available.
Sequence and Producer
getSequenceId()
Get the sequence id associated with this message.
Sequence id associated with this message
It is typically set by the applications via TypedMessageBuilder.sequenceId(long).
getProducerName()
Get the producer name who produced this message.
Producer name who produced this message, null if producer name is not set
Example:
String producerName = msg.getProducerName();
if (producerName != null) {
System.out.println("Produced by: " + producerName);
}
getSchemaVersion()
Get schema version of the message.
byte[] getSchemaVersion()
Schema version of the message if the message is produced with schema, otherwise null
getSchemaId()
Get schema ID of the message.
Optional<byte[]> getSchemaId()
The schema ID if the message is produced with external schema and schema ID is set, otherwise empty
PIP-420 provides a way to produce messages with external schema, and the schema ID will be set to the message metadata.
getReaderSchema()
Get the schema associated to the message.
default Optional<Schema<?>> getReaderSchema()
The schema used to decode the payload of message
Please note that this schema is usually equal to the Schema you passed during the construction of the Consumer or the Reader. But if you are consuming the topic using the GenericObject interface, this method will return the schema associated with the message.
Replication
isReplicated()
Check whether the message is replicated from other cluster.
True if the message is replicated from other cluster, false otherwise
getReplicatedFrom()
Get name of cluster from which the message is replicated.
String getReplicatedFrom()
The name of cluster from which the message is replicated
Example:
if (msg.isReplicated()) {
String sourceCluster = msg.getReplicatedFrom();
System.out.println("Message replicated from: " + sourceCluster);
}
Redelivery
getRedeliveryCount()
Get message redelivery count.
Redelivery count is maintained in pulsar broker. When client acknowledge message timeout, broker will dispatch message again with message redelivery count in CommandMessage defined.
Message redelivery increases monotonically in a broker. When topic switches ownership to another broker, redelivery count will be recalculated.
Example:
int redeliveryCount = msg.getRedeliveryCount();
if (redeliveryCount > 3) {
System.out.println("Message has been redelivered " + redeliveryCount + " times");
// Maybe send to dead letter queue
}
Encryption
getEncryptionCtx()
Get the encryption context.
Optional<EncryptionContext> getEncryptionCtx()
Returns
Optional<EncryptionContext>
The optional encryption context
EncryptionContext contains encryption and compression information in it using which application can decrypt consumed message with encrypted-payload.
Index
hasIndex()
Check whether the message has an index.
True if the message has an index, otherwise false
getIndex()
Get index from broker entry metadata.
Optional<Long> getIndex()
Index from broker entry metadata, or empty if the feature is not enabled in the broker
Note that only if the feature is enabled in the broker then the value is available.
Resource Management
release()
Release a message back to the pool.
This is required only if the consumer was created with the option to pool messages, otherwise it will have no effect.
Complete Example
import org.apache.pulsar.client.api.*;
import java.util.Map;
public class MessageInspectionExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
Message<byte[]> msg = consumer.receive();
// Print message details
System.out.println("=== Message Details ===");
System.out.println("Message ID: " + msg.getMessageId());
System.out.println("Topic: " + msg.getTopicName());
System.out.println("Producer: " + msg.getProducerName());
System.out.println("Publish time: " + msg.getPublishTime());
System.out.println("Event time: " + msg.getEventTime());
System.out.println("Sequence ID: " + msg.getSequenceId());
System.out.println("Size: " + msg.size() + " bytes");
System.out.println("Redelivery count: " + msg.getRedeliveryCount());
// Print key if present
if (msg.hasKey()) {
System.out.println("Key: " + msg.getKey());
}
// Print ordering key if present
if (msg.hasOrderingKey()) {
System.out.println("Ordering key: " + new String(msg.getOrderingKey()));
}
// Print properties
Map<String, String> props = msg.getProperties();
if (!props.isEmpty()) {
System.out.println("\n=== Properties ===");
props.forEach((key, value) ->
System.out.println(key + ": " + value)
);
}
// Print replication info
if (msg.isReplicated()) {
System.out.println("\nReplicated from: " + msg.getReplicatedFrom());
}
// Print message content
System.out.println("\n=== Message Content ===");
System.out.println(new String(msg.getData()));
// Acknowledge the message
consumer.acknowledge(msg);
consumer.close();
client.close();
}
}