The Apache Pulsar Node.js client enables JavaScript and TypeScript applications to produce and consume messages from Pulsar.
Installation
Install the Node.js client:
npm install pulsar-client
The Node.js client requires Node.js 12.x or later. It’s a native addon built on the C++ client.
Quick start
Here’s a complete example in JavaScript:
const Pulsar = require('pulsar-client');
(async () => {
// Create client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create producer
const producer = await client.createProducer({
topic: 'my-topic',
});
// Send message
const msgId = await producer.send({
data: Buffer.from('Hello Pulsar!'),
});
console.log('Message published:', msgId.toString());
// Create consumer
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
});
// Receive message
const msg = await consumer.receive();
console.log('Received:', msg.getData().toString());
await consumer.acknowledge(msg);
// Close resources
await producer.close();
await consumer.close();
await client.close();
})();
TypeScript support
The client includes TypeScript definitions:
import Pulsar from 'pulsar-client';
interface User {
name: string;
age: number;
}
(async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producer = await client.createProducer({
topic: 'user-topic',
});
const user: User = { name: 'John', age: 30 };
await producer.send({
data: Buffer.from(JSON.stringify(user)),
});
await producer.close();
await client.close();
})();
Creating a client
Basic client configuration:
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
For TLS connections:
const client = new Pulsar.Client({
serviceUrl: 'pulsar+ssl://localhost:6651',
tlsTrustCertsFilePath: '/path/to/ca.cert.pem',
tlsAllowInsecureConnection: false,
});
Producing messages
Basic producer
const producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
});
// Send message
const msgId = await producer.send({
data: Buffer.from('Hello Pulsar'),
});
console.log('Message sent:', msgId.toString());
Producer with configuration
const producer = await client.createProducer({
topic: 'my-topic',
producerName: 'my-producer',
sendTimeoutMs: 30000,
batchingEnabled: true,
batchingMaxMessages: 100,
batchingMaxPublishDelayMs: 10,
compressionType: Pulsar.CompressionType.LZ4,
});
Sending with properties
await producer.send({
data: Buffer.from('Message content'),
properties: {
key1: 'value1',
key2: 'value2',
},
partitionKey: 'my-key',
eventTimestamp: Date.now(),
});
Asynchronous send without await
producer.send({
data: Buffer.from('Async message'),
}).then(msgId => {
console.log('Message sent:', msgId.toString());
}).catch(err => {
console.error('Send failed:', err);
});
Consuming messages
Basic consumer
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
subscriptionType: 'Shared',
});
while (true) {
const msg = await consumer.receive();
console.log('Received:', msg.getData().toString());
await consumer.acknowledge(msg);
}
Consumer with timeout
try {
const msg = await consumer.receive(5000); // 5 second timeout
console.log('Received:', msg.getData().toString());
await consumer.acknowledge(msg);
} catch (err) {
console.log('Receive timeout');
}
Consumer with message listener
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
listener: (msg, msgConsumer) => {
console.log('Received:', msg.getData().toString());
msgConsumer.acknowledge(msg);
},
});
// Keep running
await new Promise(() => {});
Negative acknowledgment
const msg = await consumer.receive();
try {
processMessage(msg);
await consumer.acknowledge(msg);
} catch (err) {
await consumer.negativeAcknowledge(msg); // Redeliver
}
Batch receive
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
batchReceivePolicy: {
maxNumMessages: 100,
maxNumBytes: 1024 * 1024,
timeoutMs: 200,
},
});
const messages = await consumer.batchReceive();
for (const msg of messages) {
console.log('Received:', msg.getData().toString());
}
await consumer.acknowledgeAll(messages);
Using readers
const reader = await client.createReader({
topic: 'my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
while (await reader.hasNext()) {
const msg = await reader.readNext();
console.log('Read:', msg.getData().toString());
}
await reader.close();
Working with schemas
JSON messages
// Producer
const user = { name: 'John', age: 30 };
await producer.send({
data: Buffer.from(JSON.stringify(user)),
});
// Consumer
const msg = await consumer.receive();
const user = JSON.parse(msg.getData().toString());
console.log('Name:', user.name, 'Age:', user.age);
String messages
// Send string
await producer.send({
data: Buffer.from('Hello Pulsar'),
});
// Receive string
const msg = await consumer.receive();
const text = msg.getData().toString('utf8');
console.log('Received:', text);
Authentication
TLS authentication
const client = new Pulsar.Client({
serviceUrl: 'pulsar+ssl://localhost:6651',
tlsTrustCertsFilePath: '/path/to/ca.cert.pem',
authentication: new Pulsar.AuthenticationTls({
certificatePath: '/path/to/client.cert.pem',
privateKeyPath: '/path/to/client.key.pem',
}),
});
Token authentication
const fs = require('fs');
// Token string
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: new Pulsar.AuthenticationToken({
token: 'eyJhbGciOiJIUzI1NiJ9...',
}),
});
// Token from file
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: new Pulsar.AuthenticationToken({
token: () => fs.readFileSync('/path/to/token.txt', 'utf8').trim(),
}),
});
OAuth 2.0 authentication
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: new Pulsar.AuthenticationOauth2({
type: 'client_credentials',
issuer_url: 'https://auth.example.com',
client_id: 'my-client-id',
client_secret: 'my-client-secret',
audience: 'https://pulsar.example.com',
}),
});
Subscription types
// Exclusive (default)
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'exclusive-sub',
subscriptionType: 'Exclusive',
});
// Shared
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'shared-sub',
subscriptionType: 'Shared',
});
// Key_Shared
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'key-shared-sub',
subscriptionType: 'KeyShared',
});
// Failover
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'failover-sub',
subscriptionType: 'Failover',
});
Error handling
try {
await producer.send({
data: Buffer.from('message'),
});
} catch (err) {
if (err.message.includes('timeout')) {
console.error('Send timeout');
} else if (err.message.includes('closed')) {
console.error('Producer closed');
} else {
console.error('Error:', err);
}
}
Express.js integration
const express = require('express');
const Pulsar = require('pulsar-client');
const app = express();
app.use(express.json());
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
let producer;
(async () => {
producer = await client.createProducer({
topic: 'events',
});
app.post('/events', async (req, res) => {
try {
const msgId = await producer.send({
data: Buffer.from(JSON.stringify(req.body)),
});
res.json({ messageId: msgId.toString() });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});
})();
Node.js client repository
The Node.js client is maintained in a separate repository:
Next steps
Schema support
Learn about Pulsar schemas
Subscription types
Understanding subscription types
Authentication
Configure authentication
Producers and consumers
Deep dive into messaging