Skip to main content
The Apache Pulsar C++ client provides the lowest latency and highest throughput for performance-critical applications.

Installation

Using package managers

sudo apt-get install -y libpulsar-dev

Building from source

git clone https://github.com/apache/pulsar-client-cpp
cd pulsar-client-cpp
cmake . -DBUILD_TESTS=OFF
make -j8
sudo make install
The C++ client requires C++11 or later. Dependencies include Boost, OpenSSL, zlib, and protobuf.

Quick start

Here’s a complete example:
#include <pulsar/Client.h>
#include <iostream>

using namespace pulsar;

int main() {
    // Create client
    Client client("pulsar://localhost:6650");

    // Create producer
    Producer producer;
    Result result = client.createProducer("my-topic", producer);
    if (result != ResultOk) {
        std::cerr << "Error creating producer: " << result << std::endl;
        return -1;
    }

    // Send message
    Message msg = MessageBuilder().setContent("Hello Pulsar!").build();
    result = producer.send(msg);
    if (result == ResultOk) {
        std::cout << "Message published" << std::endl;
    }

    // Create consumer
    Consumer consumer;
    result = client.subscribe("my-topic", "my-subscription", consumer);
    if (result != ResultOk) {
        std::cerr << "Error creating consumer: " << result << std::endl;
        return -1;
    }

    // Receive message
    Message receivedMsg;
    result = consumer.receive(receivedMsg);
    if (result == ResultOk) {
        std::cout << "Received: " << receivedMsg.getDataAsString() << std::endl;
        consumer.acknowledge(receivedMsg);
    }

    // Close resources
    producer.close();
    consumer.close();
    client.close();

    return 0;
}
Compile with:
g++ -std=c++11 example.cpp -lpulsar -o example

Creating a client

Basic client configuration:
ClientConfiguration config;
config.setOperationTimeoutSeconds(30);
config.setConnectionTimeout(std::chrono::seconds(5));

Client client("pulsar://localhost:6650", config);
For TLS connections:
ClientConfiguration config;
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");
config.setTlsAllowInsecureConnection(false);

Client client("pulsar+ssl://localhost:6651", config);

Producing messages

Basic producer

ProducerConfiguration producerConfig;
Producer producer;

Result result = client.createProducer(
    "persistent://public/default/my-topic",
    producerConfig,
    producer
);

if (result != ResultOk) {
    std::cerr << "Error: " << result << std::endl;
    return;
}

// Synchronous send
Message msg = MessageBuilder()
    .setContent("Hello Pulsar")
    .build();

MessageId msgId;
result = producer.send(msg, msgId);
if (result == ResultOk) {
    std::cout << "Message sent: " << msgId << std::endl;
}

Producer with configuration

ProducerConfiguration config;
config.setProducerName("my-producer");
config.setSendTimeout(30000); // 30 seconds
config.setCompressionType(CompressionLZ4);
config.setBatchingEnabled(true);
config.setBatchingMaxMessages(100);
config.setBatchingMaxPublishDelayMs(10);

Producer producer;
client.createProducer("my-topic", config, producer);

Asynchronous send

producer.sendAsync(msg, [](Result result, const MessageId& msgId) {
    if (result == ResultOk) {
        std::cout << "Message sent: " << msgId << std::endl;
    } else {
        std::cerr << "Send failed: " << result << std::endl;
    }
});

Sending with properties

Message msg = MessageBuilder()
    .setContent("Message content")
    .setProperty("key1", "value1")
    .setProperty("key2", "value2")
    .setPartitionKey("my-key")
    .setEventTimestamp(std::chrono::system_clock::now())
    .build();

producer.send(msg);

Consuming messages

Basic consumer

ConsumerConfiguration config;
Consumer consumer;

Result result = client.subscribe(
    "my-topic",
    "my-subscription",
    config,
    consumer
);

if (result != ResultOk) {
    std::cerr << "Subscribe failed: " << result << std::endl;
    return;
}

while (true) {
    Message msg;
    Result result = consumer.receive(msg);
    
    if (result == ResultOk) {
        std::cout << "Received: " << msg.getDataAsString() << std::endl;
        consumer.acknowledge(msg);
    }
}

Consumer with timeout

Message msg;
Result result = consumer.receive(msg, 5000); // 5 second timeout

if (result == ResultOk) {
    std::cout << "Received: " << msg.getDataAsString() << std::endl;
    consumer.acknowledge(msg);
} else if (result == ResultTimeout) {
    std::cout << "Receive timeout" << std::endl;
}

Consumer with message listener

ConsumerConfiguration config;

config.setMessageListener([](Consumer consumer, const Message& msg) {
    std::cout << "Received: " << msg.getDataAsString() << std::endl;
    consumer.acknowledge(msg);
});

Consumer consumer;
client.subscribe("my-topic", "my-subscription", config, consumer);

// Keep running
std::this_thread::sleep_for(std::chrono::seconds(60));

Batch receive

ConsumerConfiguration config;
config.setBatchReceivePolicy(BatchReceivePolicy(
    100,              // max messages
    1024 * 1024,      // max bytes
    200               // timeout ms
));

Consumer consumer;
client.subscribe("my-topic", "my-subscription", config, consumer);

Messages messages;
Result result = consumer.batchReceive(messages);

if (result == ResultOk) {
    for (const Message& msg : messages) {
        std::cout << "Received: " << msg.getDataAsString() << std::endl;
    }
    consumer.acknowledge(messages);
}

Using readers

ReaderConfiguration config;
Reader reader;

Result result = client.createReader(
    "my-topic",
    MessageId::earliest(),
    config,
    reader
);

while (reader.hasMessageAvailable()) {
    Message msg;
    Result result = reader.readNext(msg);
    if (result == ResultOk) {
        std::cout << "Read: " << msg.getDataAsString() << std::endl;
    }
}

reader.close();

Working with schemas

String schema

static const std::string exampleSchema = R"({"type":"string"})"; 
SchemaInfo schemaInfo(STRING, "String", exampleSchema);

ProducerConfiguration producerConf;
producerConf.setSchema(schemaInfo);

Producer producer;
client.createProducer("string-topic", producerConf, producer);

std::string data = "Hello";
Message msg = MessageBuilder()
    .setContent(data.c_str(), data.size())
    .build();
producer.send(msg);

JSON schema

static const std::string jsonSchema = R"({
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"}
    ]
})"; 

SchemaInfo schemaInfo(JSON, "User", jsonSchema);

ProducerConfiguration config;
config.setSchema(schemaInfo);

Producer producer;
client.createProducer("user-topic", config, producer);

Authentication

TLS authentication

AuthenticationPtr auth = pulsar::AuthTls::create(
    "/path/to/client.cert.pem",
    "/path/to/client.key.pem"
);

ClientConfiguration config;
config.setAuth(auth);
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");

Client client("pulsar+ssl://localhost:6651", config);

Token authentication

// Token string
AuthenticationPtr auth = pulsar::AuthToken::createWithToken(
    "eyJhbGciOiJIUzI1NiJ9..."
);

ClientConfiguration config;
config.setAuth(auth);

Client client("pulsar://localhost:6650", config);

// Token from file
AuthenticationPtr auth = pulsar::AuthToken::create(
    "/path/to/token.txt"
);

OAuth 2.0 authentication

std::string params = R"({
    "type": "client_credentials",
    "issuer_url": "https://auth.example.com",
    "client_id": "my-client-id",
    "client_secret": "my-client-secret",
    "audience": "https://pulsar.example.com"
})";

AuthenticationPtr auth = pulsar::AuthOauth2::create(params);

ClientConfiguration config;
config.setAuth(auth);

Client client("pulsar://localhost:6650", config);

Subscription types

ConsumerConfiguration config;

// Exclusive (default)
config.setConsumerType(ConsumerExclusive);

// Shared
config.setConsumerType(ConsumerShared);

// Key_Shared
config.setConsumerType(ConsumerKeyShared);

// Failover
config.setConsumerType(ConsumerFailover);

Consumer consumer;
client.subscribe("my-topic", "my-subscription", config, consumer);

Error handling

Result result = producer.send(msg);

switch (result) {
    case ResultOk:
        std::cout << "Success" << std::endl;
        break;
    case ResultTimeout:
        std::cerr << "Send timeout" << std::endl;
        break;
    case ResultProducerClosed:
        std::cerr << "Producer closed" << std::endl;
        break;
    case ResultTopicTerminated:
        std::cerr << "Topic terminated" << std::endl;
        break;
    default:
        std::cerr << "Error: " << strResult(result) << std::endl;
        break;
}

Performance optimization

Producer tuning

ProducerConfiguration config;
config.setBatchingEnabled(true);
config.setBatchingMaxMessages(1000);
config.setBatchingMaxPublishDelayMs(10);
config.setCompressionType(CompressionLZ4);
config.setBlockIfQueueFull(true);
config.setSendTimeout(30000);

Consumer tuning

ConsumerConfiguration config;
config.setReceiverQueueSize(1000);
config.setMaxTotalReceiverQueueSizeAcrossPartitions(50000);

C++ client repository

The C++ client is maintained in a separate repository:

Next steps

Schema support

Learn about Pulsar schemas

Subscription types

Understanding subscription types

Authentication

Configure authentication

Performance tuning

Optimize for performance

Build docs developers (and LLMs) love