Skip to main content
The Apache Pulsar .NET client (DotPulsar) is a modern, fully-managed C# implementation for .NET Core and .NET Framework.

Installation

Install the DotPulsar NuGet package:
dotnet add package DotPulsar
Or via Package Manager:
Install-Package DotPulsar
DotPulsar requires .NET Core 3.1+ or .NET Framework 4.6.2+. It’s a pure C# implementation with no native dependencies.

Quick start

Here’s a complete example:
using DotPulsar;
using DotPulsar.Extensions;
using System;
using System.Text;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // Create client
        await using var client = PulsarClient.Builder()
            .ServiceUrl(new Uri("pulsar://localhost:6650"))
            .Build();

        // Create producer
        await using var producer = client.NewProducer(Schema.String)
            .Topic("my-topic")
            .Create();

        // Send message
        var messageId = await producer.Send("Hello Pulsar!");
        Console.WriteLine($"Message published: {messageId}");

        // Create consumer
        await using var consumer = client.NewConsumer(Schema.String)
            .Topic("my-topic")
            .SubscriptionName("my-subscription")
            .Create();

        // Receive message
        await foreach (var message in consumer.Messages())
        {
            Console.WriteLine($"Received: {message.Value()}");
            await consumer.Acknowledge(message);
            break;
        }
    }
}

Creating a client

Basic client configuration:
var client = PulsarClient.Builder()
    .ServiceUrl(new Uri("pulsar://localhost:6650"))
    .Build();
For TLS connections:
var client = PulsarClient.Builder()
    .ServiceUrl(new Uri("pulsar+ssl://localhost:6651"))
    .TrustedCertificateAuthority(new X509Certificate2("/path/to/ca.cert.pem"))
    .Build();

Producing messages

Basic producer

await using var producer = client.NewProducer(Schema.String)
    .Topic("persistent://public/default/my-topic")
    .Create();

// Send message
var messageId = await producer.Send("Hello Pulsar");
Console.WriteLine($"Message sent: {messageId}");

Producer with configuration

await using var producer = client.NewProducer(Schema.String)
    .Topic("my-topic")
    .ProducerName("my-producer")
    .CompressionType(CompressionType.Lz4)
    .Create();

Sending with metadata

var data = new MessageMetadata
{
    ["key1"] = "value1",
    ["key2"] = "value2"
};

var messageId = await producer.NewMessage()
    .Properties(data)
    .EventTime((ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
    .Key("message-key")
    .Value("Message content")
    .Send();

Sending byte arrays

await using var producer = client.NewProducer(Schema.ByteArray)
    .Topic("bytes-topic")
    .Create();

var bytes = Encoding.UTF8.GetBytes("Hello");
var messageId = await producer.Send(bytes);

Consuming messages

Basic consumer

await using var consumer = client.NewConsumer(Schema.String)
    .Topic("my-topic")
    .SubscriptionName("my-subscription")
    .SubscriptionType(SubscriptionType.Shared)
    .Create();

await foreach (var message in consumer.Messages())
{
    try
    {
        Console.WriteLine($"Received: {message.Value()}");
        await consumer.Acknowledge(message);
    }
    catch (Exception ex)
    {
        await consumer.NegativeAcknowledge(message);
    }
}

Consumer with cancellation

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await foreach (var message in consumer.Messages(cts.Token))
{
    Console.WriteLine($"Received: {message.Value()}");
    await consumer.Acknowledge(message);
}

Receive with timeout

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

try
{
    var message = await consumer.Receive(cts.Token);
    Console.WriteLine($"Received: {message.Value()}");
    await consumer.Acknowledge(message);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Receive timeout");
}

Batch receive

await using var consumer = client.NewConsumer(Schema.String)
    .Topic("my-topic")
    .SubscriptionName("my-subscription")
    .Create();

var messages = await consumer.Receive(100, cts.Token); // Receive up to 100 messages

foreach (var message in messages)
{
    Console.WriteLine($"Received: {message.Value()}");
}

await consumer.Acknowledge(messages);

Using readers

await using var reader = client.NewReader(Schema.String)
    .Topic("my-topic")
    .StartMessageId(MessageId.Earliest)
    .Create();

await foreach (var message in reader.Messages())
{
    Console.WriteLine($"Read: {message.Value()}");
}

Working with schemas

JSON schema

public class User
{
    public string Name { get; set; }
    public int Age { get; set; }
}

// Producer
await using var producer = client.NewProducer(Schema.Json<User>())
    .Topic("user-topic")
    .Create();

var user = new User { Name = "John", Age = 30 };
var messageId = await producer.Send(user);

// Consumer
await using var consumer = client.NewConsumer(Schema.Json<User>())
    .Topic("user-topic")
    .SubscriptionName("user-sub")
    .Create();

await foreach (var message in consumer.Messages())
{
    var user = message.Value();
    Console.WriteLine($"Name: {user.Name}, Age: {user.Age}");
    await consumer.Acknowledge(message);
}

Primitive schemas

// String schema
var stringProducer = client.NewProducer(Schema.String)
    .Topic("string-topic")
    .Create();

// Byte array schema
var bytesProducer = client.NewProducer(Schema.ByteArray)
    .Topic("bytes-topic")
    .Create();

// Int32 schema
var intProducer = client.NewProducer(Schema.Int32)
    .Topic("int-topic")
    .Create();

// Boolean schema
var boolProducer = client.NewProducer(Schema.Boolean)
    .Topic("bool-topic")
    .Create();

Authentication

TLS authentication

var clientCertificate = new X509Certificate2("/path/to/client.pfx", "password");

var client = PulsarClient.Builder()
    .ServiceUrl(new Uri("pulsar+ssl://localhost:6651"))
    .TrustedCertificateAuthority(new X509Certificate2("/path/to/ca.cert.pem"))
    .Authentication(new TlsAuthentication(clientCertificate))
    .Build();

Token authentication

using System.IO;

// Token string
var client = PulsarClient.Builder()
    .ServiceUrl(new Uri("pulsar://localhost:6650"))
    .Authentication(new TokenAuthentication("eyJhbGciOiJIUzI1NiJ9..."))
    .Build();

// Token from file
var token = await File.ReadAllTextAsync("/path/to/token.txt");
var client = PulsarClient.Builder()
    .ServiceUrl(new Uri("pulsar://localhost:6650"))
    .Authentication(new TokenAuthentication(token.Trim()))
    .Build();

Subscription types

// Exclusive (default)
var consumer = client.NewConsumer(Schema.String)
    .Topic("my-topic")
    .SubscriptionName("exclusive-sub")
    .SubscriptionType(SubscriptionType.Exclusive)
    .Create();

// Shared
var consumer = client.NewConsumer(Schema.String)
    .Topic("my-topic")
    .SubscriptionName("shared-sub")
    .SubscriptionType(SubscriptionType.Shared)
    .Create();

// Key_Shared
var consumer = client.NewConsumer(Schema.String)
    .Topic("my-topic")
    .SubscriptionName("key-shared-sub")
    .SubscriptionType(SubscriptionType.KeyShared)
    .Create();

// Failover
var consumer = client.NewConsumer(Schema.String)
    .Topic("my-topic")
    .SubscriptionName("failover-sub")
    .SubscriptionType(SubscriptionType.Failover)
    .Create();

Error handling

try
{
    var messageId = await producer.Send("message");
}
catch (ProducerClosedException)
{
    Console.WriteLine("Producer is closed");
}
catch (TimeoutException)
{
    Console.WriteLine("Send timeout");
}
catch (Exception ex)
{
    Console.WriteLine($"Error: {ex.Message}");
}

ASP.NET Core integration

using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using DotPulsar;
using DotPulsar.Extensions;

var builder = WebApplication.CreateBuilder(args);

// Register Pulsar client
builder.Services.AddSingleton<IPulsarClient>(sp =>
{
    return PulsarClient.Builder()
        .ServiceUrl(new Uri("pulsar://localhost:6650"))
        .Build();
});

var app = builder.Build();

app.MapPost("/events", async (IPulsarClient client, EventData data) =>
{
    await using var producer = client.NewProducer(Schema.Json<EventData>())
        .Topic("events")
        .Create();

    var messageId = await producer.Send(data);
    return Results.Ok(new { messageId = messageId.ToString() });
});

app.Run();

public record EventData(string Name, string Value);

Monitoring state

Monitor producer and consumer state:
await using var producer = client.NewProducer(Schema.String)
    .Topic("my-topic")
    .Create();

producer.OnStateChangedFrom(ProducerState.Disconnected, ProducerState.Connected)
    .ContinueWith(t => Console.WriteLine("Producer connected"));

producer.OnStateChangedTo(ProducerState.Faulted)
    .ContinueWith(t => Console.WriteLine("Producer faulted"));

.NET client repository

The .NET client is maintained in a separate repository:

Next steps

Schema support

Learn about Pulsar schemas

Subscription types

Understanding subscription types

Authentication

Configure authentication

Producers and consumers

Deep dive into messaging

Build docs developers (and LLMs) love