Skip to main content
Producers and consumers are the client applications that publish and consume messages from Pulsar topics. Understanding their behavior is essential for building reliable messaging applications.

Producers

From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java, producers publish messages to topics:
/**
 * Producer is used to publish messages on a topic.
 * 
 * A single producer instance can be used across multiple threads.
 */
public interface Producer<T> extends Closeable {
    MessageId send(T message) throws PulsarClientException;
    CompletableFuture<MessageId> sendAsync(T message);
    TypedMessageBuilder<T> newMessage();
}

Creating Producers

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("persistent://tenant/namespace/topic")
    .producerName("my-producer")
    .create();
Producer instances are thread-safe and can be shared across multiple threads in your application.

Sending Messages

Synchronous Send

// Blocking call - waits for broker acknowledgment
MessageId msgId = producer.send("Hello Pulsar");
System.out.println("Message published with ID: " + msgId);

Asynchronous Send

// Non-blocking - returns immediately with a CompletableFuture
CompletableFuture<MessageId> future = producer.sendAsync("Hello Pulsar");

future.thenAccept(msgId -> {
    System.out.println("Message published: " + msgId);
}).exceptionally(ex -> {
    System.err.println("Failed to publish: " + ex.getMessage());
    return null;
});
From Producer.java documentation: Avoid blocking operations in async callbacks as they execute on the I/O thread. Use thenAcceptAsync() with a separate executor for long-running callbacks.

Using Message Builder

producer.newMessage()
    .key("user-123")              // Message key for routing
    .value("order data")          // Message payload
    .property("region", "us-west") // Custom properties
    .eventTime(System.currentTimeMillis())
    .sequenceId(uniqueId)
    .send();

Producer Configuration

Batching

Automatic message batching improves throughput:
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .enableBatching(true)  // Default: true
    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
    .batchingMaxMessages(1000)
    .batchingMaxBytes(1024 * 1024)  // 1 MB
    .create();

Compression

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .compressionType(CompressionType.LZ4)  // LZ4, ZLIB, ZSTD, SNAPPY
    .create();

Message Routing

For partitioned topics, control how messages are routed:
// 1. Round-robin mode (default when no key)
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
    .create();

// 2. Single partition mode
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .messageRoutingMode(MessageRoutingMode.SinglePartition)
    .create();

// 3. Custom router
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .messageRouter(new MessageRouter() {
        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
            // Custom logic
            return hash(msg.getKey()) % metadata.numPartitions();
        }
    })
    .create();

Delivery Guarantees

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .sendTimeout(30, TimeUnit.SECONDS)  // Timeout for send operations
    .blockIfQueueFull(true)  // Block instead of throwing exception
    .maxPendingMessages(1000)  // Max pending async sends
    .create();

Producer Access Mode

From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java:
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .producerName("my-producer")
    .accessMode(ProducerAccessMode.Exclusive)  // Only this producer can publish
    .create();
Modes:
  • Shared (default): Multiple producers allowed
  • Exclusive: Only one producer can publish
  • WaitForExclusive: Wait for exclusive access

Producer Stats

ProducerStats stats = producer.getStats();

System.out.println("Messages sent: " + stats.getTotalMsgsSent());
System.out.println("Bytes sent: " + stats.getTotalBytesSent());
System.out.println("Send failures: " + stats.getTotalSendFailed());
System.out.println("Acks received: " + stats.getTotalAcksReceived());

Consumers

From pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java, consumers receive messages from topics:
/**
 * An interface that abstracts behavior of Pulsar's consumer.
 * 
 * All operations on the consumer instance are thread safe.
 */
public interface Consumer<T> extends Closeable {
    Message<T> receive() throws PulsarClientException;
    CompletableFuture<Message<T>> receiveAsync();
    void acknowledge(Message<?> message) throws PulsarClientException;
}

Creating Consumers

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("persistent://tenant/namespace/topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    .subscribe();
Consumer instances are thread-safe. Multiple threads can safely call receive() on the same consumer.

Receiving Messages

Synchronous Receive

// Block until a message is available
Message<String> msg = consumer.receive();
try {
    processMessage(msg);
    consumer.acknowledge(msg);
} catch (Exception e) {
    consumer.negativeAcknowledge(msg);
}

Receive with Timeout

Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg != null) {
    processMessage(msg);
    consumer.acknowledge(msg);
}

Asynchronous Receive

CompletableFuture<Message<String>> future = consumer.receiveAsync();

future.thenAccept(msg -> {
    processMessage(msg);
    consumer.acknowledgeAsync(msg);
});

Batch Receive

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .batchReceivePolicy(BatchReceivePolicy.builder()
        .maxNumMessages(100)
        .maxNumBytes(1024 * 1024)
        .timeout(200, TimeUnit.MILLISECONDS)
        .build())
    .subscribe();

Messages<String> messages = consumer.batchReceive();
for (Message<String> msg : messages) {
    processMessage(msg);
}
consumer.acknowledge(messages);

Message Listeners

Process messages asynchronously with a listener:
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .messageListener((consumer, msg) -> {
        try {
            processMessage(msg);
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    })
    .subscribe();

// Messages are automatically delivered to the listener
When using a message listener, you cannot use the receive() methods. The listener callback must be thread-safe.

Acknowledgment Strategies

Individual Acknowledgment

Message<String> msg = consumer.receive();
processMessage(msg);
consumer.acknowledge(msg);

Cumulative Acknowledgment

// Only for Exclusive and Failover subscriptions
Message<String> msg = consumer.receive();
processMessage(msg);
consumer.acknowledgeCumulative(msg);
// All messages up to 'msg' are acknowledged

Negative Acknowledgment

Message<String> msg = consumer.receive();
try {
    processMessage(msg);
    consumer.acknowledge(msg);
} catch (Exception e) {
    // Trigger redelivery
    consumer.negativeAcknowledge(msg);
}

Delayed Redelivery

Message<String> msg = consumer.receive();
try {
    processMessage(msg);
    consumer.acknowledge(msg);
} catch (TemporaryException e) {
    // Retry after 5 minutes
    consumer.reconsumeLater(msg, 5, TimeUnit.MINUTES);
}

Consumer Configuration

Subscription Configuration

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscriptionType(SubscriptionType.Shared)
    .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
    .subscribe();

Acknowledgment Timeout

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .ackTimeout(30, TimeUnit.SECONDS)
    .ackTimeoutTickTime(5, TimeUnit.SECONDS)
    .subscribe();

Dead Letter Queue

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .deadLetterPolicy(DeadLetterPolicy.builder()
        .maxRedeliverCount(3)
        .deadLetterTopic("my-topic-DLQ")
        .build())
    .subscribe();

Consumer Interceptors

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .intercept(new ConsumerInterceptor<String>() {
        public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
            // Pre-process message
            return message;
        }
        
        public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable exception) {
            // Track acknowledgments
        }
    })
    .subscribe();

Multi-Topic Subscriptions

// Subscribe to multiple topics
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topics(Arrays.asList(
        "persistent://tenant/ns/topic-1",
        "persistent://tenant/ns/topic-2"
    ))
    .subscriptionName("my-sub")
    .subscribe();

// Subscribe using pattern
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topicsPattern("persistent://tenant/ns/.*")
    .subscriptionName("my-sub")
    .subscribe();

Consumer Pause/Resume

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscribe();

// Pause consumption
consumer.pause();

// Resume consumption
consumer.resume();

Seeking

Reposition the subscription cursor:
// Seek to earliest message
consumer.seek(MessageId.earliest);

// Seek to latest message
consumer.seek(MessageId.latest);

// Seek to specific message ID
consumer.seek(messageId);

// Seek to timestamp
consumer.seek(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(24));

Pulsar Client

Both producers and consumers are created from a PulsarClient instance:
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .connectionTimeout(30, TimeUnit.SECONDS)
    .operationTimeout(30, TimeUnit.SECONDS)
    .build();

// Create producers and consumers
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscribe();

// Always close resources
producer.close();
consumer.close();
client.close();

Client Configuration

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://pulsar.example.com:6651")
    .authentication(
        AuthenticationFactory.token("eyJhbGc..."))
    .tlsTrustCertsFilePath("/path/to/ca-cert.pem")
    .enableTls(true)
    .allowTlsInsecureConnection(false)
    .ioThreads(10)
    .listenerThreads(10)
    .connectionsPerBroker(1)
    .build();

Best Practices

  • Reuse producer instances across multiple threads
  • Enable batching for better throughput
  • Use async send for non-critical messages
  • Set appropriate send timeouts
  • Handle send failures with retry logic
  • Close producers gracefully with close() or use try-with-resources
  • Use batch receive for high-throughput scenarios
  • Configure appropriate ack timeout based on processing time
  • Implement dead letter queues for poison messages
  • Use message listeners for async processing
  • Monitor consumer lag regularly
  • Handle redelivery gracefully with idempotent processing
  • Share PulsarClient instances across your application
  • Close producers/consumers when done
  • Use connection pooling for better resource utilization
  • Monitor client metrics for performance tuning

Next Steps

Schemas

Learn about type-safe message serialization

Subscriptions

Deep dive into subscription types

Messaging

Understand delivery semantics

Topics

Explore topic configuration

Build docs developers (and LLMs) love