Skip to main content

Overview

The Consumer interface abstracts the behavior of Pulsar’s consumer. All operations on the consumer instance are thread safe.

Creating a Consumer

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscribe();

Core Methods

getTopic()

Get the topic for the consumer.
String getTopic()
Returns
String
The topic name for the consumer

getSubscription()

Get the subscription for the consumer.
String getSubscription()
Returns
String
The subscription name for the consumer

getConsumerName()

Get the name of consumer.
String getConsumerName()
Returns
String
The consumer name

Receiving Messages

receive()

Receives a single message in blocking mode.
Message<T> receive() throws PulsarClientException
Returns
Message<T>
The received message, or null if the thread was interrupted
This method blocks until a message is available or the consumer is closed. Behavior when interrupted:
  • If the thread is interrupted while waiting: returns null and resets the interrupted flag
  • If the consumer is closed while waiting: throws PulsarClientException with the cause InterruptedException("Queue is terminated")
Example:
while (true) {
    Message<String> msg = consumer.receive();
    System.out.println("Received: " + new String(msg.getData()));
    consumer.acknowledge(msg);
}
Exceptions:
  • PulsarClientException.AlreadyClosedException - if the consumer was already closed before this method was called
  • PulsarClientException.InvalidConfigurationException - if a message listener was defined in the configuration

receive(timeout, unit)

Receive a single message with timeout.
Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException
timeout
int
0 or less means immediate rather than infinite
unit
TimeUnit
The time unit for the timeout
Returns
Message<T>
The received Message or null if no message available before timeout
Retrieves a message, waiting up to the specified wait time if necessary. If consumer closes during wait, returns null immediately. Example:
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg != null) {
    System.out.println("Received: " + new String(msg.getData()));
    consumer.acknowledge(msg);
} else {
    System.out.println("No message received within timeout");
}

receiveAsync()

Receive a single message asynchronously.
CompletableFuture<Message<T>> receiveAsync()
Returns
CompletableFuture<Message<T>>
A future that will be completed when message is available
Retrieves a message when it will be available and completes the future with received message. Example:
consumer.receiveAsync().thenAccept(msg -> {
    System.out.println("Received: " + new String(msg.getData()));
    consumer.acknowledgeAsync(msg);
});
The returned future can be cancelled before completion by calling .cancel(false) to remove it from the backlog of receive requests.

batchReceive()

Batch receiving messages.
Messages<T> batchReceive() throws PulsarClientException
Returns
Messages<T>
A batch of messages
This call blocks until has enough messages or wait timeout, more details to see BatchReceivePolicy. Example:
Messages<String> messages = consumer.batchReceive();
for (Message<String> msg : messages) {
    System.out.println("Received: " + new String(msg.getData()));
}
consumer.acknowledge(messages);

batchReceiveAsync()

Batch receiving messages asynchronously.
CompletableFuture<Messages<T>> batchReceiveAsync()
Returns
CompletableFuture<Messages<T>>
A future that will be completed with a batch of messages
Retrieves messages when has enough messages or wait timeout and completes the future with received messages. Example:
consumer.batchReceiveAsync().thenAccept(messages -> {
    for (Message<String> msg : messages) {
        System.out.println("Received: " + new String(msg.getData()));
    }
    consumer.acknowledgeAsync(messages);
});

Acknowledging Messages

Acknowledgment methods are inherited from the MessageAcknowledger interface. See the main documentation for acknowledge(), acknowledgeCumulative(), and their async variants.

negativeAcknowledge(Message)

Acknowledge the failure to process a single message.
void negativeAcknowledge(Message<?> message)
message
Message<?>
The Message to be acknowledged
When a message is “negatively acked” it will be marked for redelivery after some fixed delay. The delay is configurable when constructing the consumer with ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit). Example:
while (true) {
    Message<String> msg = consumer.receive();
    try {
        // Process message...
        processMessage(msg);
        consumer.acknowledge(msg);
    } catch (Throwable t) {
        log.warn("Failed to process message");
        consumer.negativeAcknowledge(msg);
    }
}

negativeAcknowledge(MessageId)

Acknowledge the failure to process a single message by MessageId.
void negativeAcknowledge(MessageId messageId)
messageId
MessageId
The MessageId to be acknowledged
This variation allows to pass a MessageId rather than a Message object, in order to avoid keeping the payload in memory for extended amount of time.

negativeAcknowledge(Messages)

Acknowledge the failure to process multiple messages.
void negativeAcknowledge(Messages<?> messages)
messages
Messages<?>
The Messages to be acknowledged
Example:
while (true) {
    Messages<String> msgs = consumer.batchReceive();
    try {
        // Process messages...
        consumer.acknowledge(msgs);
    } catch (Throwable t) {
        log.warn("Failed to process messages");
        consumer.negativeAcknowledge(msgs);
    }
}

Reconsume Later

reconsumeLater(Message, delayTime, unit)

Reconsume a message after a custom delay.
void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) 
    throws PulsarClientException
message
Message<?>
The message to be reconsumed later
delayTime
long
The amount of delay before the message will be delivered
unit
TimeUnit
The time unit for the delay
Example:
while (true) {
    Message<String> msg = consumer.receive();
    try {
        // Process message...
        consumer.acknowledge(msg);
    } catch (Throwable t) {
        log.warn("Failed to process message");
        consumer.reconsumeLater(msg, 1000, TimeUnit.MILLISECONDS);
    }
}

reconsumeLater(Message, customProperties, delayTime, unit)

Reconsume a message with custom properties after a custom delay.
void reconsumeLater(Message<?> message, Map<String, String> customProperties,
                    long delayTime, TimeUnit unit) throws PulsarClientException
message
Message<?>
The message to be reconsumed later
customProperties
Map<String, String>
The custom properties to be reconsumed later
delayTime
long
The amount of delay before the message will be delivered
unit
TimeUnit
The time unit for the delay

reconsumeLater(Messages, delayTime, unit)

Reconsume multiple messages after a custom delay.
void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) 
    throws PulsarClientException
messages
Messages<?>
The messages to be reconsumed later
delayTime
long
The amount of delay before the message will be delivered
unit
TimeUnit
The time unit for the delay

reconsumeLaterCumulative()

Reconsume the reception of all the messages in the stream up to (and including) the provided message.
void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) 
    throws PulsarClientException
message
Message<?>
The message to be cumulatively reconsumed
delayTime
long
The amount of delay before the message will be delivered
unit
TimeUnit
The time unit for the delay

reconsumeLaterAsync()

Asynchronously reconsume a message.
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit)
Returns
CompletableFuture<Void>
A future that can be used to track the completion of the operation

reconsumeLaterAsync(Message, customProperties, delayTime, unit)

Asynchronously reconsume a message with custom properties.
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message,
                                            Map<String, String> customProperties,
                                            long delayTime, TimeUnit unit)

reconsumeLaterAsync(Messages, delayTime, unit)

Asynchronously reconsume multiple messages.
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit)

reconsumeLaterCumulativeAsync()

Asynchronously reconsume cumulatively.
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, 
                                                      long delayTime, TimeUnit unit)

reconsumeLaterCumulativeAsync(Message, customProperties, delayTime, unit)

Asynchronously reconsume cumulatively with custom properties.
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message,
                                                      Map<String, String> customProperties,
                                                      long delayTime, TimeUnit unit)

Subscription Management

unsubscribe()

Unsubscribe the consumer.
void unsubscribe() throws PulsarClientException
This call blocks until the consumer is unsubscribed. Unsubscribing will cause the subscription to be deleted and all the data retained can potentially be deleted as well. The operation will fail when performed on a shared subscription where multiple consumers are currently connected.

unsubscribe(force)

Unsubscribe the consumer forcefully.
void unsubscribe(boolean force) throws PulsarClientException
force
boolean
Forcefully unsubscribe by disconnecting connected consumers

unsubscribeAsync()

Asynchronously unsubscribe the consumer.
CompletableFuture<Void> unsubscribeAsync()
Returns
CompletableFuture<Void>
A future to track the operation

unsubscribeAsync(force)

Asynchronously unsubscribe the consumer forcefully.
CompletableFuture<Void> unsubscribeAsync(boolean force)
force
boolean
Forcefully unsubscribe by disconnecting connected consumers

Seek Operations

seek(MessageId)

Reset the subscription associated with this consumer to a specific message id.
void seek(MessageId messageId) throws PulsarClientException
messageId
MessageId
The message id where to reposition the subscription
The message id can either be a specific message or represent the first or last messages in the topic:
  • MessageId.earliest - Reset the subscription on the earliest message available in the topic
  • MessageId.latest - Reset the subscription on the latest message in the topic
This effectively resets the acknowledgement state of the subscription: all messages up to and including messageId will be marked as acknowledged and the rest unacknowledged. Example:
// Seek to earliest
consumer.seek(MessageId.earliest);

// Seek to latest
consumer.seek(MessageId.latest);

// Seek to specific message
consumer.seek(specificMessageId);

seek(timestamp)

Reset the subscription associated with this consumer to a specific message publish time.
void seek(long timestamp) throws PulsarClientException
timestamp
long
The message publish time where to reposition the subscription (Unix time in milliseconds)
Example:
// Seek to 1 hour ago
long oneHourAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
consumer.seek(oneHourAgo);

seek(Function)

Reset the subscription associated with this consumer to a specific message ID or message publish time.
void seek(Function<String, Object> function) throws PulsarClientException
function
Function<String, Object>
Function that takes topic+partition and returns timestamp or MessageId
The Function input is topic+partition. It returns only timestamp or MessageId. If returns null, the current partition will not do any processing.

seekAsync(MessageId)

The asynchronous version of seek(MessageId).
CompletableFuture<Void> seekAsync(MessageId messageId)

seekAsync(timestamp)

The asynchronous version of seek(timestamp).
CompletableFuture<Void> seekAsync(long timestamp)

seekAsync(Function)

The asynchronous version of seek(Function).
CompletableFuture<Void> seekAsync(Function<String, Object> function)

Message ID Operations

getLastMessageId()

Get the last message id available for consume.
@Deprecated
MessageId getLastMessageId() throws PulsarClientException
Deprecated: Use getLastMessageIds() instead.

getLastMessageIdAsync()

Get the last message id available for consume asynchronously.
@Deprecated
CompletableFuture<MessageId> getLastMessageIdAsync()
Deprecated: Use getLastMessageIdsAsync() instead.

getLastMessageIds()

Get all the last message id of the topics the consumer subscribed.
List<TopicMessageId> getLastMessageIds() throws PulsarClientException
Returns
List<TopicMessageId>
The list of TopicMessageId instances of all the topics that the consumer subscribed

getLastMessageIdsAsync()

The asynchronous version of getLastMessageIds().
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync()

Flow Control

pause()

Stop requesting new messages from the broker until resume() is called.
void pause()
Note that this might cause receive() to block until resume() is called and new messages are pushed by the broker.

resume()

Resume requesting messages from the broker.
void resume()

Redelivery

redeliverUnacknowledgedMessages()

Redelivers all the unacknowledged messages.
void redeliverUnacknowledgedMessages()
In Failover mode, the request is ignored if the consumer is not active for the given topic. In Shared mode, the consumer’s messages to be redelivered are distributed across all the connected consumers. This is a non-blocking call and doesn’t throw an exception. In case the connection breaks, the messages are redelivered after reconnect.

Statistics and Status

getStats()

Get statistics for the consumer.
ConsumerStats getStats()
Returns
ConsumerStats
Statistics for the consumer
Statistics include:
  • numMsgsReceived - Number of messages received in the current interval
  • numBytesReceived - Number of bytes received in the current interval
  • numReceiveFailed - Number of messages failed to receive in the current interval
  • numAcksSent - Number of acks sent in the current interval
  • numAcksFailed - Number of acks failed to send in the current interval
  • totalMsgsReceived - Total number of messages received
  • totalBytesReceived - Total number of bytes received
  • totalReceiveFailed - Total number of messages failed to receive
  • totalAcksSent - Total number of acks sent
  • totalAcksFailed - Total number of acks failed to send

isConnected()

Check whether the consumer is currently connected to the broker.
boolean isConnected()
Returns
boolean
True if connected to the broker, false otherwise

getLastDisconnectedTimestamp()

Get the last disconnected timestamp of the consumer.
long getLastDisconnectedTimestamp()
Returns
long
The last disconnected timestamp in milliseconds

hasReachedEndOfTopic()

Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
boolean hasReachedEndOfTopic()
Returns
boolean
True if end of topic reached, false otherwise
Please note that this does not simply mean that the consumer is caught up with the last message published by producers, rather the topic needs to be explicitly “terminated”.

Closing the Consumer

close()

Close the consumer and stop the broker from pushing more messages.
void close() throws PulsarClientException
Example:
try {
    consumer.close();
    System.out.println("Consumer closed successfully");
} catch (PulsarClientException e) {
    System.err.println("Failed to close consumer: " + e.getMessage());
}

closeAsync()

Asynchronously close the consumer and stop the broker from pushing more messages.
CompletableFuture<Void> closeAsync()
Returns
CompletableFuture<Void>
A future that can be used to track the completion of the operation

Complete Example

import org.apache.pulsar.client.api.*;
import java.util.concurrent.TimeUnit;

public class ConsumerExample {
    public static void main(String[] args) throws Exception {
        // Create client
        PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650")
            .build();

        // Create consumer
        Consumer<byte[]> consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscribe();

        // Receive messages
        while (true) {
            Message<byte[]> msg = consumer.receive();
            try {
                System.out.println("Received: " + new String(msg.getData()));
                // Process the message
                processMessage(msg);
                // Acknowledge the message
                consumer.acknowledge(msg);
            } catch (Exception e) {
                // Negative acknowledge on failure
                consumer.negativeAcknowledge(msg);
            }
        }
    }

    private static void processMessage(Message<?> msg) {
        // Your processing logic here
    }
}

Build docs developers (and LLMs) love