Skip to main content
The Apache Pulsar Java client is the most feature-complete client implementation and is maintained as part of the main Pulsar repository.

Installation

Add the Pulsar Java client to your project:
<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>4.2.0</version>
</dependency>
The Java client requires Java 17 or later. Check the version compatibility table for your Pulsar version.

Quick start

Here’s a complete example that creates a producer and consumer:
import org.apache.pulsar.client.api.*;

public class PulsarExample {
    public static void main(String[] args) throws Exception {
        // Create client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        // Create producer
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic")
                .create();

        // Send message
        MessageId msgId = producer.send("Hello Pulsar!");
        System.out.println("Published message: " + msgId);

        // Create consumer
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .subscribe();

        // Receive message
        Message<String> msg = consumer.receive();
        System.out.println("Received: " + msg.getValue());
        consumer.acknowledge(msg);

        // Close resources
        producer.close();
        consumer.close();
        client.close();
    }
}

Creating a client

The PulsarClient is the entry point for all operations:
PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
For TLS connections:
PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar+ssl://localhost:6651")
        .enableTls(true)
        .tlsTrustCertsFilePath("/path/to/ca.cert.pem")
        .build();
See the ClientBuilder API reference for all configuration options.

Producing messages

Basic producer

Producer<String> producer = client.newProducer(Schema.STRING)
        .topic("persistent://public/default/my-topic")
        .create();

// Synchronous send
MessageId msgId = producer.send("Hello Pulsar");

// Asynchronous send
producer.sendAsync("Hello Pulsar")
        .thenAccept(msgId -> {
            System.out.println("Message sent: " + msgId);
        });

Producer with properties

Producer<byte[]> producer = client.newProducer()
        .topic("my-topic")
        .producerName("my-producer")
        .batchingMaxMessages(100)
        .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
        .compressionType(CompressionType.LZ4)
        .enableBatching(true)
        .create();

Sending with properties

producer.newMessage()
        .value("Message content")
        .property("key1", "value1")
        .property("key2", "value2")
        .eventTime(System.currentTimeMillis())
        .key("message-key")
        .send();
See the Producer API reference for details.

Consuming messages

Basic consumer

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Shared)
        .subscribe();

while (true) {
    Message<String> msg = consumer.receive();
    try {
        System.out.println("Received: " + msg.getValue());
        consumer.acknowledge(msg);
    } catch (Exception e) {
        consumer.negativeAcknowledge(msg);
    }
}

Consumer with message listener

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .messageListener((c, msg) -> {
            try {
                System.out.println("Received: " + msg.getValue());
                c.acknowledge(msg);
            } catch (Exception e) {
                c.negativeAcknowledge(msg);
            }
        })
        .subscribe();

Batch receive

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .batchReceivePolicy(BatchReceivePolicy.builder()
                .maxNumMessages(100)
                .maxNumBytes(1024 * 1024)
                .timeout(200, TimeUnit.MILLISECONDS)
                .build())
        .subscribe();

Messages<String> messages = consumer.batchReceive();
for (Message<String> msg : messages) {
    System.out.println("Received: " + msg.getValue());
}
consumer.acknowledge(messages);
See the Consumer API reference for details.

Using readers

Readers allow you to read messages from a specific position:
Reader<String> reader = client.newReader(Schema.STRING)
        .topic("my-topic")
        .startMessageId(MessageId.earliest)
        .create();

while (reader.hasMessageAvailable()) {
    Message<String> msg = reader.readNext();
    System.out.println("Read: " + msg.getValue());
}
See the Reader API reference for details.

Working with schemas

Pulsar provides built-in schemas for common types:
// Primitive schemas
Producer<String> stringProducer = client.newProducer(Schema.STRING)
        .topic("string-topic").create();

Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
        .topic("bytes-topic").create();

Producer<Integer> intProducer = client.newProducer(Schema.INT32)
        .topic("int-topic").create();

Custom POJO schemas

public class User {
    public String name;
    public int age;
}

// JSON schema
Producer<User> producer = client.newProducer(Schema.JSON(User.class))
        .topic("user-topic")
        .create();

User user = new User();
user.name = "John";
user.age = 30;
producer.send(user);

// Avro schema
Producer<User> avroProducer = client.newProducer(Schema.AVRO(User.class))
        .topic("user-topic-avro")
        .create();
See the Schema API reference for details.

Authentication

TLS authentication

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar+ssl://localhost:6651")
        .enableTls(true)
        .tlsTrustCertsFilePath("/path/to/ca.cert.pem")
        .authentication(
                AuthenticationFactory.TLS(
                        "/path/to/client.cert.pem",
                        "/path/to/client.key.pem"
                )
        )
        .build();

Token authentication

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .authentication(
                AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9...")
        )
        .build();

// Or from file
client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .authentication(
                AuthenticationFactory.token(() -> {
                    return Files.readString(Path.of("/path/to/token.txt"));
                })
        )
        .build();

OAuth 2.0 authentication

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .authentication(
                AuthenticationFactoryOAuth2.clientCredentials(
                        new URL("https://auth.example.com/oauth/token"),
                        new URL("file:///path/to/credentials.json"),
                        "https://pulsar.example.com"
                )
        )
        .build();

Performance tuning

For optimal performance, configure these JVM options:
-Xms2G -Xmx2G
-XX:+UseG1GC
-XX:MaxGCPauseMillis=10
-Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024
-Dpulsar.allocator.pooled=true
-Dpulsar.allocator.exit_on_oom=false

Producer tuning

Producer<byte[]> producer = client.newProducer()
        .topic("my-topic")
        .batchingMaxMessages(1000)
        .batchingMaxBytes(1024 * 1024)
        .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
        .compressionType(CompressionType.LZ4)
        .blockIfQueueFull(true)
        .sendTimeout(30, TimeUnit.SECONDS)
        .create();

Consumer tuning

Consumer<byte[]> consumer = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-sub")
        .receiverQueueSize(1000)
        .maxTotalReceiverQueueSizeAcrossPartitions(50000)
        .subscribe();

Error handling

try {
    producer.send("message");
} catch (PulsarClientException.TimeoutException e) {
    // Send timeout
} catch (PulsarClientException.AlreadyClosedException e) {
    // Producer already closed
} catch (PulsarClientException.TopicTerminatedException e) {
    // Topic has been terminated
} catch (PulsarClientException e) {
    // Other errors
}

Next steps

ClientBuilder API

Full client configuration reference

Producer API

Complete producer API documentation

Consumer API

Complete consumer API documentation

Schema API

Working with schemas

Build docs developers (and LLMs) love