Skip to main content
The Apache Pulsar Node.js client enables JavaScript and TypeScript applications to produce and consume messages from Pulsar.

Installation

Install the Node.js client:
npm install pulsar-client
The Node.js client requires Node.js 12.x or later. It’s a native addon built on the C++ client.

Quick start

Here’s a complete example in JavaScript:
const Pulsar = require('pulsar-client');

(async () => {
  // Create client
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  // Create producer
  const producer = await client.createProducer({
    topic: 'my-topic',
  });

  // Send message
  const msgId = await producer.send({
    data: Buffer.from('Hello Pulsar!'),
  });
  console.log('Message published:', msgId.toString());

  // Create consumer
  const consumer = await client.subscribe({
    topic: 'my-topic',
    subscription: 'my-subscription',
  });

  // Receive message
  const msg = await consumer.receive();
  console.log('Received:', msg.getData().toString());
  await consumer.acknowledge(msg);

  // Close resources
  await producer.close();
  await consumer.close();
  await client.close();
})();

TypeScript support

The client includes TypeScript definitions:
import Pulsar from 'pulsar-client';

interface User {
  name: string;
  age: number;
}

(async () => {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producer = await client.createProducer({
    topic: 'user-topic',
  });

  const user: User = { name: 'John', age: 30 };
  await producer.send({
    data: Buffer.from(JSON.stringify(user)),
  });

  await producer.close();
  await client.close();
})();

Creating a client

Basic client configuration:
const client = new Pulsar.Client({
  serviceUrl: 'pulsar://localhost:6650',
  operationTimeoutSeconds: 30,
});
For TLS connections:
const client = new Pulsar.Client({
  serviceUrl: 'pulsar+ssl://localhost:6651',
  tlsTrustCertsFilePath: '/path/to/ca.cert.pem',
  tlsAllowInsecureConnection: false,
});

Producing messages

Basic producer

const producer = await client.createProducer({
  topic: 'persistent://public/default/my-topic',
});

// Send message
const msgId = await producer.send({
  data: Buffer.from('Hello Pulsar'),
});
console.log('Message sent:', msgId.toString());

Producer with configuration

const producer = await client.createProducer({
  topic: 'my-topic',
  producerName: 'my-producer',
  sendTimeoutMs: 30000,
  batchingEnabled: true,
  batchingMaxMessages: 100,
  batchingMaxPublishDelayMs: 10,
  compressionType: Pulsar.CompressionType.LZ4,
});

Sending with properties

await producer.send({
  data: Buffer.from('Message content'),
  properties: {
    key1: 'value1',
    key2: 'value2',
  },
  partitionKey: 'my-key',
  eventTimestamp: Date.now(),
});

Asynchronous send without await

producer.send({
  data: Buffer.from('Async message'),
}).then(msgId => {
  console.log('Message sent:', msgId.toString());
}).catch(err => {
  console.error('Send failed:', err);
});

Consuming messages

Basic consumer

const consumer = await client.subscribe({
  topic: 'my-topic',
  subscription: 'my-subscription',
  subscriptionType: 'Shared',
});

while (true) {
  const msg = await consumer.receive();
  console.log('Received:', msg.getData().toString());
  await consumer.acknowledge(msg);
}

Consumer with timeout

try {
  const msg = await consumer.receive(5000); // 5 second timeout
  console.log('Received:', msg.getData().toString());
  await consumer.acknowledge(msg);
} catch (err) {
  console.log('Receive timeout');
}

Consumer with message listener

const consumer = await client.subscribe({
  topic: 'my-topic',
  subscription: 'my-subscription',
  listener: (msg, msgConsumer) => {
    console.log('Received:', msg.getData().toString());
    msgConsumer.acknowledge(msg);
  },
});

// Keep running
await new Promise(() => {});

Negative acknowledgment

const msg = await consumer.receive();
try {
  processMessage(msg);
  await consumer.acknowledge(msg);
} catch (err) {
  await consumer.negativeAcknowledge(msg); // Redeliver
}

Batch receive

const consumer = await client.subscribe({
  topic: 'my-topic',
  subscription: 'my-subscription',
  batchReceivePolicy: {
    maxNumMessages: 100,
    maxNumBytes: 1024 * 1024,
    timeoutMs: 200,
  },
});

const messages = await consumer.batchReceive();
for (const msg of messages) {
  console.log('Received:', msg.getData().toString());
}
await consumer.acknowledgeAll(messages);

Using readers

const reader = await client.createReader({
  topic: 'my-topic',
  startMessageId: Pulsar.MessageId.earliest(),
});

while (await reader.hasNext()) {
  const msg = await reader.readNext();
  console.log('Read:', msg.getData().toString());
}

await reader.close();

Working with schemas

JSON messages

// Producer
const user = { name: 'John', age: 30 };
await producer.send({
  data: Buffer.from(JSON.stringify(user)),
});

// Consumer
const msg = await consumer.receive();
const user = JSON.parse(msg.getData().toString());
console.log('Name:', user.name, 'Age:', user.age);

String messages

// Send string
await producer.send({
  data: Buffer.from('Hello Pulsar'),
});

// Receive string
const msg = await consumer.receive();
const text = msg.getData().toString('utf8');
console.log('Received:', text);

Authentication

TLS authentication

const client = new Pulsar.Client({
  serviceUrl: 'pulsar+ssl://localhost:6651',
  tlsTrustCertsFilePath: '/path/to/ca.cert.pem',
  authentication: new Pulsar.AuthenticationTls({
    certificatePath: '/path/to/client.cert.pem',
    privateKeyPath: '/path/to/client.key.pem',
  }),
});

Token authentication

const fs = require('fs');

// Token string
const client = new Pulsar.Client({
  serviceUrl: 'pulsar://localhost:6650',
  authentication: new Pulsar.AuthenticationToken({
    token: 'eyJhbGciOiJIUzI1NiJ9...',
  }),
});

// Token from file
const client = new Pulsar.Client({
  serviceUrl: 'pulsar://localhost:6650',
  authentication: new Pulsar.AuthenticationToken({
    token: () => fs.readFileSync('/path/to/token.txt', 'utf8').trim(),
  }),
});

OAuth 2.0 authentication

const client = new Pulsar.Client({
  serviceUrl: 'pulsar://localhost:6650',
  authentication: new Pulsar.AuthenticationOauth2({
    type: 'client_credentials',
    issuer_url: 'https://auth.example.com',
    client_id: 'my-client-id',
    client_secret: 'my-client-secret',
    audience: 'https://pulsar.example.com',
  }),
});

Subscription types

// Exclusive (default)
const consumer = await client.subscribe({
  topic: 'my-topic',
  subscription: 'exclusive-sub',
  subscriptionType: 'Exclusive',
});

// Shared
const consumer = await client.subscribe({
  topic: 'my-topic',
  subscription: 'shared-sub',
  subscriptionType: 'Shared',
});

// Key_Shared
const consumer = await client.subscribe({
  topic: 'my-topic',
  subscription: 'key-shared-sub',
  subscriptionType: 'KeyShared',
});

// Failover
const consumer = await client.subscribe({
  topic: 'my-topic',
  subscription: 'failover-sub',
  subscriptionType: 'Failover',
});

Error handling

try {
  await producer.send({
    data: Buffer.from('message'),
  });
} catch (err) {
  if (err.message.includes('timeout')) {
    console.error('Send timeout');
  } else if (err.message.includes('closed')) {
    console.error('Producer closed');
  } else {
    console.error('Error:', err);
  }
}

Express.js integration

const express = require('express');
const Pulsar = require('pulsar-client');

const app = express();
app.use(express.json());

const client = new Pulsar.Client({
  serviceUrl: 'pulsar://localhost:6650',
});

let producer;

(async () => {
  producer = await client.createProducer({
    topic: 'events',
  });

  app.post('/events', async (req, res) => {
    try {
      const msgId = await producer.send({
        data: Buffer.from(JSON.stringify(req.body)),
      });
      res.json({ messageId: msgId.toString() });
    } catch (err) {
      res.status(500).json({ error: err.message });
    }
  });

  app.listen(3000, () => {
    console.log('Server running on port 3000');
  });
})();

Node.js client repository

The Node.js client is maintained in a separate repository:

Next steps

Schema support

Learn about Pulsar schemas

Subscription types

Understanding subscription types

Authentication

Configure authentication

Producers and consumers

Deep dive into messaging

Build docs developers (and LLMs) love