Overview
A Reader can be used to scan through all the messages currently available in a topic. Unlike a consumer, a reader does not use a subscription and does not maintain acknowledgment state.
Creating a Reader
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(MessageId.earliest)
.create();
Core Methods
getTopic()
Return the topic from which this reader is reading from.
Example:
String topic = reader.getTopic();
System.out.println("Reading from: " + topic);
Reading Messages
readNext()
Read the next message in the topic.
Message<T> readNext() throws PulsarClientException
This method will block until a message is available.
Example:
while (true) {
Message<String> msg = reader.readNext();
System.out.println("Read message: " + new String(msg.getData()));
}
readNext(timeout, unit)
Read the next message in the topic waiting for a maximum time.
Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException
The time unit for the timeout
The next message (could be null if none received in time)
Returns null if no message is received before the timeout.
Example:
Message<String> msg = reader.readNext(5, TimeUnit.SECONDS);
if (msg != null) {
System.out.println("Read message: " + new String(msg.getData()));
} else {
System.out.println("No message available within timeout");
}
readNextAsync()
Read asynchronously the next message in the topic.
CompletableFuture<Message<T>> readNextAsync()
Returns
CompletableFuture<Message<T>>
A future that will yield a message when it’s available
readNextAsync() should be called subsequently once returned future gets complete with received message. Else it creates a backlog of receive requests in the application.
Example:
reader.readNextAsync().thenAccept(msg -> {
System.out.println("Read message: " + new String(msg.getData()));
});
The returned future can be cancelled before completion by calling .cancel(false) to remove it from the backlog of receive requests. Another choice for ensuring a proper clean up of the returned future is to use the CompletableFuture.orTimeout method which is available on JDK9+.
Message Availability
hasMessageAvailable()
Check if there is any message available to read from the current position.
boolean hasMessageAvailable() throws PulsarClientException
True if messages are available to be read, false otherwise
This check can be used by an application to scan through a topic and stop when the reader reaches the current last published message.
Example:
while (reader.hasMessageAvailable()) {
Message<String> msg = reader.readNext();
System.out.println("Read: " + new String(msg.getData()));
}
// Done reading
This call might be blocking. Use hasMessageAvailableAsync() for async version. Even if this call returns true, that will not guarantee that a subsequent call to readNext() will not block.
hasMessageAvailableAsync()
Asynchronously check if there is any message available to read from the current position.
CompletableFuture<Boolean> hasMessageAvailableAsync()
Returns
CompletableFuture<Boolean>
A future that will yield true if messages are available to be read, false otherwise
This check can be used by an application to scan through a topic and stop when the reader reaches the current last published message.
Example:
reader.hasMessageAvailableAsync().thenAccept(available -> {
if (available) {
reader.readNextAsync().thenAccept(msg -> {
System.out.println("Read: " + new String(msg.getData()));
});
} else {
System.out.println("No more messages available");
}
});
hasReachedEndOfTopic()
Return true if the topic was terminated and this reader has reached the end of the topic.
boolean hasReachedEndOfTopic()
True if the end of topic has been reached, false otherwise
Note that this only applies to a “terminated” topic (where the topic is “sealed” and no more messages can be published) and not just that the reader is simply caught up with the publishers. Use hasMessageAvailable() to check for that.
Seek Operations
seek(MessageId)
Reset the subscription associated with this reader to a specific message id.
void seek(MessageId messageId) throws PulsarClientException
The message id where to reposition the reader
The message id can either be a specific message or represent the first or last messages in the topic:
MessageId.earliest - Reset the reader on the earliest message available in the topic
MessageId.latest - Reset the reader on the latest message in the topic
This operation can only be done on non-partitioned topics. For partitioned topics, one can rather perform the seek() on the individual partitions.
Example:
// Seek to earliest
reader.seek(MessageId.earliest);
// Seek to latest
reader.seek(MessageId.latest);
// Seek to specific message
reader.seek(specificMessageId);
seek(timestamp)
Reset the subscription associated with this reader to a specific message publish time.
void seek(long timestamp) throws PulsarClientException
The message publish time where to reposition the reader (Unix time in milliseconds)
This operation can only be done on non-partitioned topics. For partitioned topics, one can rather perform the seek() on the individual partitions.
Example:
// Seek to 1 hour ago
long oneHourAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
reader.seek(oneHourAgo);
seek(Function)
Reset the subscription associated with this reader to a specific message ID or message publish time.
void seek(Function<String, Object> function) throws PulsarClientException
Function that takes topic+partition and returns timestamp or MessageId
The Function input is topic+partition. It returns only timestamp or MessageId. Exception is thrown if other object types are returned. If returns null, the current partition will not do any processing. Exception in a partition may affect other partitions.
seekAsync(MessageId)
Reset the subscription associated with this reader to a specific message id asynchronously.
CompletableFuture<Void> seekAsync(MessageId messageId)
The message id where to position the reader
A future to track the completion of the seek operation
The message id can either be a specific message or represent the first or last messages in the topic:
MessageId.earliest - Reset the reader on the earliest message available in the topic
MessageId.latest - Reset the reader on the latest message in the topic
seekAsync(timestamp)
Reset the subscription associated with this reader to a specific message publish time asynchronously.
CompletableFuture<Void> seekAsync(long timestamp)
The message publish time where to position the reader (Unix time in milliseconds)
A future to track the completion of the seek operation
seekAsync(Function)
Reset the subscription associated with this reader to a specific message ID or message publish time asynchronously.
CompletableFuture<Void> seekAsync(Function<String, Object> function)
Function that takes topic+partition and returns timestamp or MessageId
A future to track the completion of the seek operation
Message ID Operations
getLastMessageIds()
Get all the last message id of the topics the reader subscribed.
List<TopicMessageId> getLastMessageIds() throws PulsarClientException
The list of TopicMessageId instances of all the topics that the reader subscribed
It’s guaranteed that the owner topic of each TopicMessageId in the returned list is different from owner topics of other TopicMessageId instances.
getLastMessageIdsAsync()
The asynchronous version of getLastMessageIds().
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync()
Returns
CompletableFuture<List<TopicMessageId>>
A future that will yield the list of TopicMessageId instances
Connection Status
isConnected()
Check whether the reader is connected to the broker.
True if connected to the broker, false otherwise
Closing the Reader
closeAsync()
Asynchronously close the reader and stop the broker from pushing more messages.
CompletableFuture<Void> closeAsync()
A future that can be used to track the completion of the operation
Example:
reader.closeAsync().thenRun(() -> {
System.out.println("Reader closed successfully");
}).exceptionally(ex -> {
System.err.println("Failed to close reader: " + ex.getMessage());
return null;
});
The Reader interface extends Closeable, so you can also use the synchronous close() method inherited from that interface.
Complete Example
import org.apache.pulsar.client.api.*;
import java.util.concurrent.TimeUnit;
public class ReaderExample {
public static void main(String[] args) throws Exception {
// Create client
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create reader starting from earliest message
Reader<byte[]> reader = client.newReader()
.topic("my-topic")
.startMessageId(MessageId.earliest)
.create();
// Read all available messages
while (reader.hasMessageAvailable()) {
Message<byte[]> msg = reader.readNext();
System.out.println("Message ID: " + msg.getMessageId());
System.out.println("Message payload: " + new String(msg.getData()));
System.out.println("Message key: " + msg.getKey());
}
System.out.println("Reached end of available messages");
// Check if topic is terminated
if (reader.hasReachedEndOfTopic()) {
System.out.println("Topic has been terminated");
}
// Close reader
reader.close();
client.close();
}
}
Seeking Example
import org.apache.pulsar.client.api.*;
import java.util.concurrent.TimeUnit;
public class ReaderSeekExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Reader<byte[]> reader = client.newReader()
.topic("my-topic")
.startMessageId(MessageId.earliest)
.create();
// Read some messages
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = reader.readNext();
System.out.println("Read: " + new String(msg.getData()));
}
// Seek to 1 hour ago
long oneHourAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
reader.seek(oneHourAgo);
System.out.println("Seeked to 1 hour ago");
// Continue reading from new position
while (reader.hasMessageAvailable()) {
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
if (msg != null) {
System.out.println("Read after seek: " + new String(msg.getData()));
}
}
reader.close();
client.close();
}
}