Installation
Using package managers
sudo apt-get install -y libpulsar-dev
Building from source
git clone https://github.com/apache/pulsar-client-cpp
cd pulsar-client-cpp
cmake . -DBUILD_TESTS=OFF
make -j8
sudo make install
The C++ client requires C++11 or later. Dependencies include Boost, OpenSSL, zlib, and protobuf.
Quick start
Here’s a complete example:#include <pulsar/Client.h>
#include <iostream>
using namespace pulsar;
int main() {
// Create client
Client client("pulsar://localhost:6650");
// Create producer
Producer producer;
Result result = client.createProducer("my-topic", producer);
if (result != ResultOk) {
std::cerr << "Error creating producer: " << result << std::endl;
return -1;
}
// Send message
Message msg = MessageBuilder().setContent("Hello Pulsar!").build();
result = producer.send(msg);
if (result == ResultOk) {
std::cout << "Message published" << std::endl;
}
// Create consumer
Consumer consumer;
result = client.subscribe("my-topic", "my-subscription", consumer);
if (result != ResultOk) {
std::cerr << "Error creating consumer: " << result << std::endl;
return -1;
}
// Receive message
Message receivedMsg;
result = consumer.receive(receivedMsg);
if (result == ResultOk) {
std::cout << "Received: " << receivedMsg.getDataAsString() << std::endl;
consumer.acknowledge(receivedMsg);
}
// Close resources
producer.close();
consumer.close();
client.close();
return 0;
}
g++ -std=c++11 example.cpp -lpulsar -o example
Creating a client
Basic client configuration:ClientConfiguration config;
config.setOperationTimeoutSeconds(30);
config.setConnectionTimeout(std::chrono::seconds(5));
Client client("pulsar://localhost:6650", config);
ClientConfiguration config;
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");
config.setTlsAllowInsecureConnection(false);
Client client("pulsar+ssl://localhost:6651", config);
Producing messages
Basic producer
ProducerConfiguration producerConfig;
Producer producer;
Result result = client.createProducer(
"persistent://public/default/my-topic",
producerConfig,
producer
);
if (result != ResultOk) {
std::cerr << "Error: " << result << std::endl;
return;
}
// Synchronous send
Message msg = MessageBuilder()
.setContent("Hello Pulsar")
.build();
MessageId msgId;
result = producer.send(msg, msgId);
if (result == ResultOk) {
std::cout << "Message sent: " << msgId << std::endl;
}
Producer with configuration
ProducerConfiguration config;
config.setProducerName("my-producer");
config.setSendTimeout(30000); // 30 seconds
config.setCompressionType(CompressionLZ4);
config.setBatchingEnabled(true);
config.setBatchingMaxMessages(100);
config.setBatchingMaxPublishDelayMs(10);
Producer producer;
client.createProducer("my-topic", config, producer);
Asynchronous send
producer.sendAsync(msg, [](Result result, const MessageId& msgId) {
if (result == ResultOk) {
std::cout << "Message sent: " << msgId << std::endl;
} else {
std::cerr << "Send failed: " << result << std::endl;
}
});
Sending with properties
Message msg = MessageBuilder()
.setContent("Message content")
.setProperty("key1", "value1")
.setProperty("key2", "value2")
.setPartitionKey("my-key")
.setEventTimestamp(std::chrono::system_clock::now())
.build();
producer.send(msg);
Consuming messages
Basic consumer
ConsumerConfiguration config;
Consumer consumer;
Result result = client.subscribe(
"my-topic",
"my-subscription",
config,
consumer
);
if (result != ResultOk) {
std::cerr << "Subscribe failed: " << result << std::endl;
return;
}
while (true) {
Message msg;
Result result = consumer.receive(msg);
if (result == ResultOk) {
std::cout << "Received: " << msg.getDataAsString() << std::endl;
consumer.acknowledge(msg);
}
}
Consumer with timeout
Message msg;
Result result = consumer.receive(msg, 5000); // 5 second timeout
if (result == ResultOk) {
std::cout << "Received: " << msg.getDataAsString() << std::endl;
consumer.acknowledge(msg);
} else if (result == ResultTimeout) {
std::cout << "Receive timeout" << std::endl;
}
Consumer with message listener
ConsumerConfiguration config;
config.setMessageListener([](Consumer consumer, const Message& msg) {
std::cout << "Received: " << msg.getDataAsString() << std::endl;
consumer.acknowledge(msg);
});
Consumer consumer;
client.subscribe("my-topic", "my-subscription", config, consumer);
// Keep running
std::this_thread::sleep_for(std::chrono::seconds(60));
Batch receive
ConsumerConfiguration config;
config.setBatchReceivePolicy(BatchReceivePolicy(
100, // max messages
1024 * 1024, // max bytes
200 // timeout ms
));
Consumer consumer;
client.subscribe("my-topic", "my-subscription", config, consumer);
Messages messages;
Result result = consumer.batchReceive(messages);
if (result == ResultOk) {
for (const Message& msg : messages) {
std::cout << "Received: " << msg.getDataAsString() << std::endl;
}
consumer.acknowledge(messages);
}
Using readers
ReaderConfiguration config;
Reader reader;
Result result = client.createReader(
"my-topic",
MessageId::earliest(),
config,
reader
);
while (reader.hasMessageAvailable()) {
Message msg;
Result result = reader.readNext(msg);
if (result == ResultOk) {
std::cout << "Read: " << msg.getDataAsString() << std::endl;
}
}
reader.close();
Working with schemas
String schema
static const std::string exampleSchema = R"({"type":"string"})";
SchemaInfo schemaInfo(STRING, "String", exampleSchema);
ProducerConfiguration producerConf;
producerConf.setSchema(schemaInfo);
Producer producer;
client.createProducer("string-topic", producerConf, producer);
std::string data = "Hello";
Message msg = MessageBuilder()
.setContent(data.c_str(), data.size())
.build();
producer.send(msg);
JSON schema
static const std::string jsonSchema = R"({
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
})";
SchemaInfo schemaInfo(JSON, "User", jsonSchema);
ProducerConfiguration config;
config.setSchema(schemaInfo);
Producer producer;
client.createProducer("user-topic", config, producer);
Authentication
TLS authentication
AuthenticationPtr auth = pulsar::AuthTls::create(
"/path/to/client.cert.pem",
"/path/to/client.key.pem"
);
ClientConfiguration config;
config.setAuth(auth);
config.setUseTls(true);
config.setTlsTrustCertsFilePath("/path/to/ca.cert.pem");
Client client("pulsar+ssl://localhost:6651", config);
Token authentication
// Token string
AuthenticationPtr auth = pulsar::AuthToken::createWithToken(
"eyJhbGciOiJIUzI1NiJ9..."
);
ClientConfiguration config;
config.setAuth(auth);
Client client("pulsar://localhost:6650", config);
// Token from file
AuthenticationPtr auth = pulsar::AuthToken::create(
"/path/to/token.txt"
);
OAuth 2.0 authentication
std::string params = R"({
"type": "client_credentials",
"issuer_url": "https://auth.example.com",
"client_id": "my-client-id",
"client_secret": "my-client-secret",
"audience": "https://pulsar.example.com"
})";
AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
ClientConfiguration config;
config.setAuth(auth);
Client client("pulsar://localhost:6650", config);
Subscription types
ConsumerConfiguration config;
// Exclusive (default)
config.setConsumerType(ConsumerExclusive);
// Shared
config.setConsumerType(ConsumerShared);
// Key_Shared
config.setConsumerType(ConsumerKeyShared);
// Failover
config.setConsumerType(ConsumerFailover);
Consumer consumer;
client.subscribe("my-topic", "my-subscription", config, consumer);
Error handling
Result result = producer.send(msg);
switch (result) {
case ResultOk:
std::cout << "Success" << std::endl;
break;
case ResultTimeout:
std::cerr << "Send timeout" << std::endl;
break;
case ResultProducerClosed:
std::cerr << "Producer closed" << std::endl;
break;
case ResultTopicTerminated:
std::cerr << "Topic terminated" << std::endl;
break;
default:
std::cerr << "Error: " << strResult(result) << std::endl;
break;
}
Performance optimization
Producer tuning
ProducerConfiguration config;
config.setBatchingEnabled(true);
config.setBatchingMaxMessages(1000);
config.setBatchingMaxPublishDelayMs(10);
config.setCompressionType(CompressionLZ4);
config.setBlockIfQueueFull(true);
config.setSendTimeout(30000);
Consumer tuning
ConsumerConfiguration config;
config.setReceiverQueueSize(1000);
config.setMaxTotalReceiverQueueSizeAcrossPartitions(50000);
C++ client repository
The C++ client is maintained in a separate repository:Next steps
Schema support
Learn about Pulsar schemas
Subscription types
Understanding subscription types
Authentication
Configure authentication
Performance tuning
Optimize for performance