Producers and consumers are the client applications that publish and consume messages from Pulsar topics. Understanding their behavior is essential for building reliable messaging applications.
Producers
From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java, producers publish messages to topics:
/**
* Producer is used to publish messages on a topic.
*
* A single producer instance can be used across multiple threads.
*/
public interface Producer < T > extends Closeable {
MessageId send ( T message ) throws PulsarClientException ;
CompletableFuture < MessageId > sendAsync ( T message );
TypedMessageBuilder < T > newMessage ();
}
Creating Producers
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "persistent://tenant/namespace/topic" )
. producerName ( "my-producer" )
. create ();
Producer instances are thread-safe and can be shared across multiple threads in your application.
Sending Messages
Synchronous Send
// Blocking call - waits for broker acknowledgment
MessageId msgId = producer . send ( "Hello Pulsar" );
System . out . println ( "Message published with ID: " + msgId);
Asynchronous Send
// Non-blocking - returns immediately with a CompletableFuture
CompletableFuture < MessageId > future = producer . sendAsync ( "Hello Pulsar" );
future . thenAccept (msgId -> {
System . out . println ( "Message published: " + msgId);
}). exceptionally (ex -> {
System . err . println ( "Failed to publish: " + ex . getMessage ());
return null ;
});
From Producer.java documentation: Avoid blocking operations in async callbacks as they execute on the I/O thread. Use thenAcceptAsync() with a separate executor for long-running callbacks.
Using Message Builder
producer . newMessage ()
. key ( "user-123" ) // Message key for routing
. value ( "order data" ) // Message payload
. property ( "region" , "us-west" ) // Custom properties
. eventTime ( System . currentTimeMillis ())
. sequenceId (uniqueId)
. send ();
Producer Configuration
Batching
Automatic message batching improves throughput:
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. enableBatching ( true ) // Default: true
. batchingMaxPublishDelay ( 10 , TimeUnit . MILLISECONDS )
. batchingMaxMessages ( 1000 )
. batchingMaxBytes ( 1024 * 1024 ) // 1 MB
. create ();
Compression
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. compressionType ( CompressionType . LZ4 ) // LZ4, ZLIB, ZSTD, SNAPPY
. create ();
Message Routing
For partitioned topics, control how messages are routed:
// 1. Round-robin mode (default when no key)
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. messageRoutingMode ( MessageRoutingMode . RoundRobinPartition )
. create ();
// 2. Single partition mode
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. messageRoutingMode ( MessageRoutingMode . SinglePartition )
. create ();
// 3. Custom router
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. messageRouter ( new MessageRouter () {
public int choosePartition ( Message < ? > msg , TopicMetadata metadata ) {
// Custom logic
return hash ( msg . getKey ()) % metadata . numPartitions ();
}
})
. create ();
Delivery Guarantees
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. sendTimeout ( 30 , TimeUnit . SECONDS ) // Timeout for send operations
. blockIfQueueFull ( true ) // Block instead of throwing exception
. maxPendingMessages ( 1000 ) // Max pending async sends
. create ();
Producer Access Mode
From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java:
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. producerName ( "my-producer" )
. accessMode ( ProducerAccessMode . Exclusive ) // Only this producer can publish
. create ();
Modes:
Shared (default): Multiple producers allowed
Exclusive : Only one producer can publish
WaitForExclusive : Wait for exclusive access
Producer Stats
ProducerStats stats = producer . getStats ();
System . out . println ( "Messages sent: " + stats . getTotalMsgsSent ());
System . out . println ( "Bytes sent: " + stats . getTotalBytesSent ());
System . out . println ( "Send failures: " + stats . getTotalSendFailed ());
System . out . println ( "Acks received: " + stats . getTotalAcksReceived ());
Consumers
From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java, consumers receive messages from topics:
/**
* An interface that abstracts behavior of Pulsar's consumer.
*
* All operations on the consumer instance are thread safe.
*/
public interface Consumer < T > extends Closeable {
Message < T > receive () throws PulsarClientException ;
CompletableFuture < Message < T >> receiveAsync ();
void acknowledge ( Message < ? > message ) throws PulsarClientException ;
}
Creating Consumers
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "persistent://tenant/namespace/topic" )
. subscriptionName ( "my-subscription" )
. subscriptionType ( SubscriptionType . Shared )
. subscribe ();
Consumer instances are thread-safe. Multiple threads can safely call receive() on the same consumer.
Receiving Messages
Synchronous Receive
// Block until a message is available
Message < String > msg = consumer . receive ();
try {
processMessage (msg);
consumer . acknowledge (msg);
} catch ( Exception e ) {
consumer . negativeAcknowledge (msg);
}
Receive with Timeout
Message < String > msg = consumer . receive ( 5 , TimeUnit . SECONDS );
if (msg != null ) {
processMessage (msg);
consumer . acknowledge (msg);
}
Asynchronous Receive
CompletableFuture < Message < String >> future = consumer . receiveAsync ();
future . thenAccept (msg -> {
processMessage (msg);
consumer . acknowledgeAsync (msg);
});
Batch Receive
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. batchReceivePolicy ( BatchReceivePolicy . builder ()
. maxNumMessages ( 100 )
. maxNumBytes ( 1024 * 1024 )
. timeout ( 200 , TimeUnit . MILLISECONDS )
. build ())
. subscribe ();
Messages < String > messages = consumer . batchReceive ();
for ( Message < String > msg : messages) {
processMessage (msg);
}
consumer . acknowledge (messages);
Message Listeners
Process messages asynchronously with a listener:
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. messageListener ((consumer, msg) -> {
try {
processMessage (msg);
consumer . acknowledge (msg);
} catch ( Exception e ) {
consumer . negativeAcknowledge (msg);
}
})
. subscribe ();
// Messages are automatically delivered to the listener
When using a message listener, you cannot use the receive() methods. The listener callback must be thread-safe.
Acknowledgment Strategies
Individual Acknowledgment
Message < String > msg = consumer . receive ();
processMessage (msg);
consumer . acknowledge (msg);
Cumulative Acknowledgment
// Only for Exclusive and Failover subscriptions
Message < String > msg = consumer . receive ();
processMessage (msg);
consumer . acknowledgeCumulative (msg);
// All messages up to 'msg' are acknowledged
Negative Acknowledgment
Message < String > msg = consumer . receive ();
try {
processMessage (msg);
consumer . acknowledge (msg);
} catch ( Exception e ) {
// Trigger redelivery
consumer . negativeAcknowledge (msg);
}
Delayed Redelivery
Message < String > msg = consumer . receive ();
try {
processMessage (msg);
consumer . acknowledge (msg);
} catch ( TemporaryException e ) {
// Retry after 5 minutes
consumer . reconsumeLater (msg, 5 , TimeUnit . MINUTES );
}
Consumer Configuration
Subscription Configuration
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. subscriptionType ( SubscriptionType . Shared )
. subscriptionInitialPosition ( SubscriptionInitialPosition . Latest )
. subscribe ();
Acknowledgment Timeout
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. ackTimeout ( 30 , TimeUnit . SECONDS )
. ackTimeoutTickTime ( 5 , TimeUnit . SECONDS )
. subscribe ();
Dead Letter Queue
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. deadLetterPolicy ( DeadLetterPolicy . builder ()
. maxRedeliverCount ( 3 )
. deadLetterTopic ( "my-topic-DLQ" )
. build ())
. subscribe ();
Consumer Interceptors
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. intercept ( new ConsumerInterceptor < String >() {
public Message < String > beforeConsume ( Consumer < String > consumer , Message < String > message ) {
// Pre-process message
return message;
}
public void onAcknowledge ( Consumer < String > consumer , MessageId messageId , Throwable exception ) {
// Track acknowledgments
}
})
. subscribe ();
Multi-Topic Subscriptions
// Subscribe to multiple topics
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topics ( Arrays . asList (
"persistent://tenant/ns/topic-1" ,
"persistent://tenant/ns/topic-2"
))
. subscriptionName ( "my-sub" )
. subscribe ();
// Subscribe using pattern
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topicsPattern ( "persistent://tenant/ns/.*" )
. subscriptionName ( "my-sub" )
. subscribe ();
Consumer Pause/Resume
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. subscribe ();
// Pause consumption
consumer . pause ();
// Resume consumption
consumer . resume ();
Seeking
Reposition the subscription cursor:
// Seek to earliest message
consumer . seek ( MessageId . earliest );
// Seek to latest message
consumer . seek ( MessageId . latest );
// Seek to specific message ID
consumer . seek (messageId);
// Seek to timestamp
consumer . seek ( System . currentTimeMillis () - TimeUnit . HOURS . toMillis ( 24 ));
Pulsar Client
Both producers and consumers are created from a PulsarClient instance:
PulsarClient client = PulsarClient . builder ()
. serviceUrl ( "pulsar://localhost:6650" )
. connectionTimeout ( 30 , TimeUnit . SECONDS )
. operationTimeout ( 30 , TimeUnit . SECONDS )
. build ();
// Create producers and consumers
Producer < String > producer = client . newProducer ( Schema . STRING )
. topic ( "my-topic" )
. create ();
Consumer < String > consumer = client . newConsumer ( Schema . STRING )
. topic ( "my-topic" )
. subscriptionName ( "my-sub" )
. subscribe ();
// Always close resources
producer . close ();
consumer . close ();
client . close ();
Client Configuration
PulsarClient client = PulsarClient . builder ()
. serviceUrl ( "pulsar+ssl://pulsar.example.com:6651" )
. authentication (
AuthenticationFactory . token ( "eyJhbGc..." ))
. tlsTrustCertsFilePath ( "/path/to/ca-cert.pem" )
. enableTls ( true )
. allowTlsInsecureConnection ( false )
. ioThreads ( 10 )
. listenerThreads ( 10 )
. connectionsPerBroker ( 1 )
. build ();
Best Practices
Reuse producer instances across multiple threads
Enable batching for better throughput
Use async send for non-critical messages
Set appropriate send timeouts
Handle send failures with retry logic
Close producers gracefully with close() or use try-with-resources
Use batch receive for high-throughput scenarios
Configure appropriate ack timeout based on processing time
Implement dead letter queues for poison messages
Use message listeners for async processing
Monitor consumer lag regularly
Handle redelivery gracefully with idempotent processing
Share PulsarClient instances across your application
Close producers/consumers when done
Use connection pooling for better resource utilization
Monitor client metrics for performance tuning
Next Steps
Schemas Learn about type-safe message serialization
Subscriptions Deep dive into subscription types
Messaging Understand delivery semantics
Topics Explore topic configuration