Overview
TheConsumer interface abstracts the behavior of Pulsar’s consumer. All operations on the consumer instance are thread safe.
Creating a Consumer
Core Methods
getTopic()
Get the topic for the consumer.The topic name for the consumer
getSubscription()
Get the subscription for the consumer.The subscription name for the consumer
getConsumerName()
Get the name of consumer.The consumer name
Receiving Messages
receive()
Receives a single message in blocking mode.The received message, or null if the thread was interrupted
- If the thread is interrupted while waiting: returns null and resets the interrupted flag
- If the consumer is closed while waiting: throws
PulsarClientExceptionwith the causeInterruptedException("Queue is terminated")
PulsarClientException.AlreadyClosedException- if the consumer was already closed before this method was calledPulsarClientException.InvalidConfigurationException- if a message listener was defined in the configuration
receive(timeout, unit)
Receive a single message with timeout.0 or less means immediate rather than infinite
The time unit for the timeout
The received Message or null if no message available before timeout
receiveAsync()
Receive a single message asynchronously.A future that will be completed when message is available
.cancel(false) to remove it from the backlog of receive requests.
batchReceive()
Batch receiving messages.A batch of messages
BatchReceivePolicy.
Example:
batchReceiveAsync()
Batch receiving messages asynchronously.A future that will be completed with a batch of messages
Acknowledging Messages
Acknowledgment methods are inherited from the
MessageAcknowledger interface. See the main documentation for acknowledge(), acknowledgeCumulative(), and their async variants.negativeAcknowledge(Message)
Acknowledge the failure to process a single message.The Message to be acknowledged
ConsumerBuilder.negativeAckRedeliveryDelay(long, TimeUnit).
Example:
negativeAcknowledge(MessageId)
Acknowledge the failure to process a single message by MessageId.The MessageId to be acknowledged
MessageId rather than a Message object, in order to avoid keeping the payload in memory for extended amount of time.
negativeAcknowledge(Messages)
Acknowledge the failure to process multiple messages.The Messages to be acknowledged
Reconsume Later
reconsumeLater(Message, delayTime, unit)
Reconsume a message after a custom delay.The message to be reconsumed later
The amount of delay before the message will be delivered
The time unit for the delay
reconsumeLater(Message, customProperties, delayTime, unit)
Reconsume a message with custom properties after a custom delay.The message to be reconsumed later
The custom properties to be reconsumed later
The amount of delay before the message will be delivered
The time unit for the delay
reconsumeLater(Messages, delayTime, unit)
Reconsume multiple messages after a custom delay.The messages to be reconsumed later
The amount of delay before the message will be delivered
The time unit for the delay
reconsumeLaterCumulative()
Reconsume the reception of all the messages in the stream up to (and including) the provided message.The message to be cumulatively reconsumed
The amount of delay before the message will be delivered
The time unit for the delay
reconsumeLaterAsync()
Asynchronously reconsume a message.A future that can be used to track the completion of the operation
reconsumeLaterAsync(Message, customProperties, delayTime, unit)
Asynchronously reconsume a message with custom properties.reconsumeLaterAsync(Messages, delayTime, unit)
Asynchronously reconsume multiple messages.reconsumeLaterCumulativeAsync()
Asynchronously reconsume cumulatively.reconsumeLaterCumulativeAsync(Message, customProperties, delayTime, unit)
Asynchronously reconsume cumulatively with custom properties.Subscription Management
unsubscribe()
Unsubscribe the consumer.unsubscribe(force)
Unsubscribe the consumer forcefully.Forcefully unsubscribe by disconnecting connected consumers
unsubscribeAsync()
Asynchronously unsubscribe the consumer.A future to track the operation
unsubscribeAsync(force)
Asynchronously unsubscribe the consumer forcefully.Forcefully unsubscribe by disconnecting connected consumers
Seek Operations
seek(MessageId)
Reset the subscription associated with this consumer to a specific message id.The message id where to reposition the subscription
MessageId.earliest- Reset the subscription on the earliest message available in the topicMessageId.latest- Reset the subscription on the latest message in the topic
messageId will be marked as acknowledged and the rest unacknowledged.
Example:
seek(timestamp)
Reset the subscription associated with this consumer to a specific message publish time.The message publish time where to reposition the subscription (Unix time in milliseconds)
seek(Function)
Reset the subscription associated with this consumer to a specific message ID or message publish time.Function that takes topic+partition and returns timestamp or MessageId
seekAsync(MessageId)
The asynchronous version ofseek(MessageId).
seekAsync(timestamp)
The asynchronous version ofseek(timestamp).
seekAsync(Function)
The asynchronous version ofseek(Function).
Message ID Operations
getLastMessageId()
Get the last message id available for consume.Deprecated: Use
getLastMessageIds() instead.getLastMessageIdAsync()
Get the last message id available for consume asynchronously.Deprecated: Use
getLastMessageIdsAsync() instead.getLastMessageIds()
Get all the last message id of the topics the consumer subscribed.The list of TopicMessageId instances of all the topics that the consumer subscribed
getLastMessageIdsAsync()
The asynchronous version ofgetLastMessageIds().
Flow Control
pause()
Stop requesting new messages from the broker untilresume() is called.
receive() to block until resume() is called and new messages are pushed by the broker.
resume()
Resume requesting messages from the broker.Redelivery
redeliverUnacknowledgedMessages()
Redelivers all the unacknowledged messages.Statistics and Status
getStats()
Get statistics for the consumer.Statistics for the consumer
numMsgsReceived- Number of messages received in the current intervalnumBytesReceived- Number of bytes received in the current intervalnumReceiveFailed- Number of messages failed to receive in the current intervalnumAcksSent- Number of acks sent in the current intervalnumAcksFailed- Number of acks failed to send in the current intervaltotalMsgsReceived- Total number of messages receivedtotalBytesReceived- Total number of bytes receivedtotalReceiveFailed- Total number of messages failed to receivetotalAcksSent- Total number of acks senttotalAcksFailed- Total number of acks failed to send
isConnected()
Check whether the consumer is currently connected to the broker.True if connected to the broker, false otherwise
getLastDisconnectedTimestamp()
Get the last disconnected timestamp of the consumer.The last disconnected timestamp in milliseconds
hasReachedEndOfTopic()
Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.True if end of topic reached, false otherwise
Closing the Consumer
close()
Close the consumer and stop the broker from pushing more messages.closeAsync()
Asynchronously close the consumer and stop the broker from pushing more messages.A future that can be used to track the completion of the operation