Skip to main content

Overview

The Producer interface is used to publish messages on a topic. A single producer instance can be used across multiple threads.

Creating a Producer

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();

Core Methods

getTopic()

Return the topic which producer is publishing to.
String getTopic()
Returns
String
The topic name
Example:
String topic = producer.getTopic();
System.out.println("Publishing to: " + topic);

getProducerName()

Return the producer name which could have been assigned by the system or specified by the client.
String getProducerName()
Returns
String
The producer name

Sending Messages

send()

Sends a message synchronously.
MessageId send(T message) throws PulsarClientException
message
T
required
A message
Returns
MessageId
The message id assigned to the published message
This call will be blocking until it is successfully acknowledged by the Pulsar broker. Use newMessage() to specify more properties than just the value on the message to be sent. Example:
MessageId msgId = producer.send("Hello Pulsar".getBytes());
System.out.println("Message published with ID: " + msgId);
Exceptions:
  • PulsarClientException.TimeoutException - if the message was not correctly received by the system within the timeout period
  • PulsarClientException.AlreadyClosedException - if the producer was already closed

sendAsync()

Send a message asynchronously.
CompletableFuture<MessageId> sendAsync(T message)
message
T
required
A byte array with the payload of the message
Returns
CompletableFuture<MessageId>
A future that can be used to track when the message will have been safely persisted
When the producer queue is full, by default this method will complete the future with an exception PulsarClientException.ProducerQueueIsFullError. Example:
CompletableFuture<MessageId> future = producer.sendAsync("hello".getBytes());
future.thenAccept(msgId -> {
    System.out.println("Message published: " + msgId);
});
The returned future is completed in the internal network I/O thread. If a callback that takes a long time to complete is registered on the future, it can negatively impact the internal network processing. Use async variants like thenAcceptAsync() with an executor to avoid blocking the I/O thread.
Avoid deadlocks:
// DON'T do this - causes deadlock
producer.sendAsync("msg-1".getBytes())
    .thenAccept(__ -> producer.send("msg-2".getBytes()));

// DO this instead
producer.sendAsync("msg-1".getBytes())
    .thenAcceptAsync(__ -> {
        try {
            producer.send("msg-2".getBytes());
        } catch (Exception e) {
            // handle error
        }
    }, executor);

Message Builder

newMessage()

Create a new message builder.
TypedMessageBuilder<T> newMessage()
Returns
TypedMessageBuilder<T>
A typed message builder that can be used to construct the message to be sent through this producer
This message builder allows to specify additional properties on the message. Example:
producer.newMessage()
    .key(messageKey)
    .value(myValue)
    .property("user-defined-property", "value")
    .send();

newMessage(Schema)

Create a new message builder with schema, not required same parameterized type with the producer.
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema)
schema
Schema<V>
The schema to use for this message
Returns
TypedMessageBuilder<V>
A typed message builder with the specified schema

newMessage(Transaction)

Create a new message builder with transaction.
TypedMessageBuilder<T> newMessage(Transaction txn)
txn
Transaction
The transaction context
Returns
TypedMessageBuilder<T>
A typed message builder with transaction support
After the transaction commit, it will be made visible to consumer. After the transaction abort, it will never be visible to consumer.

newMessage(Schema, Transaction)

Create a new message builder with transaction and schema.
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema, Transaction txn)
schema
Schema<V>
The schema to use for this message
txn
Transaction
The transaction context
Returns
TypedMessageBuilder<V>
A typed message builder with schema and transaction support

Flush Operations

flush()

Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
void flush() throws PulsarClientException
Example:
producer.sendAsync("message-1".getBytes());
producer.sendAsync("message-2".getBytes());
producer.flush(); // Wait for all messages to be persisted

flushAsync()

Flush all the messages buffered in the client asynchronously.
CompletableFuture<Void> flushAsync()
Returns
CompletableFuture<Void>
A future that can be used to track when all the messages have been safely persisted
Example:
producer.sendAsync("message-1".getBytes());
producer.sendAsync("message-2".getBytes());
producer.flushAsync().thenRun(() -> {
    System.out.println("All messages flushed");
});

Sequence and Statistics

getLastSequenceId()

Get the last sequence id that was published by this producer.
long getLastSequenceId()
Returns
long
The last sequence id published by this producer
This represents either the automatically assigned or custom sequence id (set on the TypedMessageBuilder) that was published and acknowledged by the broker. After recreating a producer with the same producer name, this will return the last message that was published in the previous producer session, or -1 if no message was ever published.

getStats()

Get statistics for the producer.
ProducerStats getStats()
Returns
ProducerStats
Statistic for the producer or null if ProducerStatsRecorderImpl is disabled
Statistics include:
  • numMsgsSent - Number of messages sent in the current interval
  • numBytesSent - Number of bytes sent in the current interval
  • numSendFailed - Number of messages failed to send in the current interval
  • numAcksReceived - Number of acks received in the current interval
  • totalMsgsSent - Total number of messages sent
  • totalBytesSent - Total number of bytes sent
  • totalSendFailed - Total number of messages failed to send
  • totalAcksReceived - Total number of acks received

Connection Management

isConnected()

Check whether the producer is currently connected to the broker.
boolean isConnected()
Returns
boolean
True if connected to the broker, false otherwise

getLastDisconnectedTimestamp()

Get the last disconnected timestamp of the producer.
long getLastDisconnectedTimestamp()
Returns
long
The last disconnected timestamp in milliseconds

getNumOfPartitions()

Get the number of partitions per topic.
int getNumOfPartitions()
Returns
int
The number of partitions

Closing the Producer

close()

Close the producer and releases resources allocated.
void close() throws PulsarClientException
No more writes will be accepted from this producer. Waits until all pending write requests are persisted. In case of errors, pending writes will not be retried. Example:
try {
    producer.close();
    System.out.println("Producer closed successfully");
} catch (PulsarClientException e) {
    System.err.println("Failed to close producer: " + e.getMessage());
}
Exceptions:
  • PulsarClientException.AlreadyClosedException - if the producer was already closed

closeAsync()

Close the producer asynchronously.
CompletableFuture<Void> closeAsync()
Returns
CompletableFuture<Void>
A future that can be used to track when the producer has been closed
Example:
producer.closeAsync().thenRun(() -> {
    System.out.println("Producer closed successfully");
}).exceptionally(ex -> {
    System.err.println("Failed to close producer: " + ex.getMessage());
    return null;
});

Complete Example

import org.apache.pulsar.client.api.*;

public class ProducerExample {
    public static void main(String[] args) throws Exception {
        // Create client
        PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650")
            .build();

        // Create producer
        Producer<byte[]> producer = client.newProducer()
            .topic("my-topic")
            .create();

        // Send messages synchronously
        for (int i = 0; i < 10; i++) {
            String message = "Message-" + i;
            MessageId msgId = producer.send(message.getBytes());
            System.out.println("Published message: " + msgId);
        }

        // Send messages asynchronously
        for (int i = 0; i < 10; i++) {
            String message = "Async-Message-" + i;
            producer.sendAsync(message.getBytes())
                .thenAccept(msgId -> {
                    System.out.println("Published async message: " + msgId);
                })
                .exceptionally(ex -> {
                    System.err.println("Failed to publish: " + ex);
                    return null;
                });
        }

        // Send with properties
        producer.newMessage()
            .key("my-key")
            .value("my-value".getBytes())
            .property("custom-prop", "custom-value")
            .send();

        // Clean up
        producer.close();
        client.close();
    }
}

Build docs developers (and LLMs) love