Skip to main content

Overview

The Message interface is the message abstraction used in Pulsar. It represents a message received from a topic and provides access to the message’s payload, properties, metadata, and identifiers.

Message Properties

getProperties()

Return the properties attached to the message.
Map<String, String> getProperties()
Returns
Map<String, String>
An unmodifiable view of the properties map
Properties are application defined key/value pairs that will be attached to the message. Example:
Message<String> msg = consumer.receive();
Map<String, String> props = msg.getProperties();
for (Map.Entry<String, String> entry : props.entrySet()) {
    System.out.println(entry.getKey() + ": " + entry.getValue());
}

hasProperty()

Check whether the message has a specific property attached.
boolean hasProperty(String name)
name
String
The name of the property to check
Returns
boolean
True if the message has the specified property, false if the property is not defined
Example:
if (msg.hasProperty("user-id")) {
    String userId = msg.getProperty("user-id");
    System.out.println("User ID: " + userId);
}

getProperty()

Get the value of a specific property.
String getProperty(String name)
name
String
The name of the property
Returns
String
The value of the property or null if the property was not defined

Message Payload

getData()

Get the raw payload of the message.
byte[] getData()
Returns
byte[]
The byte array with the message payload
Even when using the Schema and type-safe API, an application has access to the underlying raw message payload. Example:
Message<String> msg = consumer.receive();
byte[] rawData = msg.getData();
String content = new String(rawData);

getValue()

Get the de-serialized value of the message, according to the configured Schema.
T getValue()
Returns
T
The deserialized value of the message
Example:
Consumer<MyObject> consumer = client.newConsumer(Schema.AVRO(MyObject.class))
    .topic("my-topic")
    .subscribe();

Message<MyObject> msg = consumer.receive();
MyObject obj = msg.getValue();

size()

Get the uncompressed message payload size in bytes.
int size()
Returns
int
Size in bytes

Message Identifiers

getMessageId()

Get the unique message ID associated with this message.
MessageId getMessageId()
Returns
MessageId
The message id (null if this message was not received by this client instance)
The message id can be used to univocally refer to a message without having to keep the entire payload in memory. Only messages received from the consumer will have a message id assigned. Example:
Message<String> msg = consumer.receive();
MessageId msgId = msg.getMessageId();
System.out.println("Message ID: " + msgId);

// Later, seek to this message
consumer.seek(msgId);

getTopicName()

Get the topic the message was published to.
String getTopicName()
Returns
String
The topic the message was published to

Message Keys

hasKey()

Check whether the message has a key.
boolean hasKey()
Returns
boolean
True if the key was set while creating the message, false otherwise

getKey()

Get the key of the message.
String getKey()
Returns
String
The key of the message
Example:
if (msg.hasKey()) {
    String key = msg.getKey();
    System.out.println("Message key: " + key);
}

hasBase64EncodedKey()

Check whether the key has been base64 encoded.
boolean hasBase64EncodedKey()
Returns
boolean
True if the key is base64 encoded, false otherwise

getKeyBytes()

Get bytes in key.
byte[] getKeyBytes()
Returns
byte[]
The key in byte[] form
If the key has been base64 encoded, it is decoded before being returned. Otherwise, if the key is a plain string, this method returns the UTF_8 encoded bytes of the string.

hasOrderingKey()

Check whether the message has an ordering key.
boolean hasOrderingKey()
Returns
boolean
True if the ordering key was set while creating the message, false otherwise

getOrderingKey()

Get the ordering key of the message.
byte[] getOrderingKey()
Returns
byte[]
The ordering key of the message

Timestamps

getPublishTime()

Get the publish time of this message.
long getPublishTime()
Returns
long
Publish time of this message
The publish time is the timestamp that a client published the message. Example:
long publishTime = msg.getPublishTime();
System.out.println("Published at: " + new Date(publishTime));

getEventTime()

Get the event time associated with this message.
long getEventTime()
Returns
long
The message event time or 0 if event time wasn’t set
It is typically set by the applications via TypedMessageBuilder.eventTime(long). If there isn’t any event time associated with this event, it will return 0. Example:
long eventTime = msg.getEventTime();
if (eventTime > 0) {
    System.out.println("Event time: " + new Date(eventTime));
}

hasBrokerPublishTime()

Check whether the message has a broker publish time.
boolean hasBrokerPublishTime()
Returns
boolean
True if the message has a broker publish time, otherwise false

getBrokerPublishTime()

Get broker publish time from broker entry metadata.
Optional<Long> getBrokerPublishTime()
Returns
Optional<Long>
Broker publish time from broker entry metadata, or empty if the feature is not enabled in the broker
Note that only if the feature is enabled in the broker then the value is available.

Sequence and Producer

getSequenceId()

Get the sequence id associated with this message.
long getSequenceId()
Returns
long
Sequence id associated with this message
It is typically set by the applications via TypedMessageBuilder.sequenceId(long).

getProducerName()

Get the producer name who produced this message.
String getProducerName()
Returns
String
Producer name who produced this message, null if producer name is not set
Example:
String producerName = msg.getProducerName();
if (producerName != null) {
    System.out.println("Produced by: " + producerName);
}

Schema Information

getSchemaVersion()

Get schema version of the message.
byte[] getSchemaVersion()
Returns
byte[]
Schema version of the message if the message is produced with schema, otherwise null

getSchemaId()

Get schema ID of the message.
Optional<byte[]> getSchemaId()
Returns
Optional<byte[]>
The schema ID if the message is produced with external schema and schema ID is set, otherwise empty
PIP-420 provides a way to produce messages with external schema, and the schema ID will be set to the message metadata.

getReaderSchema()

Get the schema associated to the message.
default Optional<Schema<?>> getReaderSchema()
Returns
Optional<Schema<?>>
The schema used to decode the payload of message
Please note that this schema is usually equal to the Schema you passed during the construction of the Consumer or the Reader. But if you are consuming the topic using the GenericObject interface, this method will return the schema associated with the message.

Replication

isReplicated()

Check whether the message is replicated from other cluster.
boolean isReplicated()
Returns
boolean
True if the message is replicated from other cluster, false otherwise

getReplicatedFrom()

Get name of cluster from which the message is replicated.
String getReplicatedFrom()
Returns
String
The name of cluster from which the message is replicated
Example:
if (msg.isReplicated()) {
    String sourceCluster = msg.getReplicatedFrom();
    System.out.println("Message replicated from: " + sourceCluster);
}

Redelivery

getRedeliveryCount()

Get message redelivery count.
int getRedeliveryCount()
Returns
int
Message redelivery count
Redelivery count is maintained in pulsar broker. When client acknowledge message timeout, broker will dispatch message again with message redelivery count in CommandMessage defined. Message redelivery increases monotonically in a broker. When topic switches ownership to another broker, redelivery count will be recalculated. Example:
int redeliveryCount = msg.getRedeliveryCount();
if (redeliveryCount > 3) {
    System.out.println("Message has been redelivered " + redeliveryCount + " times");
    // Maybe send to dead letter queue
}

Encryption

getEncryptionCtx()

Get the encryption context.
Optional<EncryptionContext> getEncryptionCtx()
Returns
Optional<EncryptionContext>
The optional encryption context
EncryptionContext contains encryption and compression information in it using which application can decrypt consumed message with encrypted-payload.

Index

hasIndex()

Check whether the message has an index.
boolean hasIndex()
Returns
boolean
True if the message has an index, otherwise false

getIndex()

Get index from broker entry metadata.
Optional<Long> getIndex()
Returns
Optional<Long>
Index from broker entry metadata, or empty if the feature is not enabled in the broker
Note that only if the feature is enabled in the broker then the value is available.

Resource Management

release()

Release a message back to the pool.
void release()
This is required only if the consumer was created with the option to pool messages, otherwise it will have no effect.

Complete Example

import org.apache.pulsar.client.api.*;
import java.util.Map;

public class MessageInspectionExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650")
            .build();

        Consumer<byte[]> consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscribe();

        Message<byte[]> msg = consumer.receive();

        // Print message details
        System.out.println("=== Message Details ===");
        System.out.println("Message ID: " + msg.getMessageId());
        System.out.println("Topic: " + msg.getTopicName());
        System.out.println("Producer: " + msg.getProducerName());
        System.out.println("Publish time: " + msg.getPublishTime());
        System.out.println("Event time: " + msg.getEventTime());
        System.out.println("Sequence ID: " + msg.getSequenceId());
        System.out.println("Size: " + msg.size() + " bytes");
        System.out.println("Redelivery count: " + msg.getRedeliveryCount());

        // Print key if present
        if (msg.hasKey()) {
            System.out.println("Key: " + msg.getKey());
        }

        // Print ordering key if present
        if (msg.hasOrderingKey()) {
            System.out.println("Ordering key: " + new String(msg.getOrderingKey()));
        }

        // Print properties
        Map<String, String> props = msg.getProperties();
        if (!props.isEmpty()) {
            System.out.println("\n=== Properties ===");
            props.forEach((key, value) -> 
                System.out.println(key + ": " + value)
            );
        }

        // Print replication info
        if (msg.isReplicated()) {
            System.out.println("\nReplicated from: " + msg.getReplicatedFrom());
        }

        // Print message content
        System.out.println("\n=== Message Content ===");
        System.out.println(new String(msg.getData()));

        // Acknowledge the message
        consumer.acknowledge(msg);

        consumer.close();
        client.close();
    }
}

Build docs developers (and LLMs) love