Installation
Install the DotPulsar NuGet package:dotnet add package DotPulsar
Install-Package DotPulsar
DotPulsar requires .NET Core 3.1+ or .NET Framework 4.6.2+. It’s a pure C# implementation with no native dependencies.
Quick start
Here’s a complete example:using DotPulsar;
using DotPulsar.Extensions;
using System;
using System.Text;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
// Create client
await using var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.Build();
// Create producer
await using var producer = client.NewProducer(Schema.String)
.Topic("my-topic")
.Create();
// Send message
var messageId = await producer.Send("Hello Pulsar!");
Console.WriteLine($"Message published: {messageId}");
// Create consumer
await using var consumer = client.NewConsumer(Schema.String)
.Topic("my-topic")
.SubscriptionName("my-subscription")
.Create();
// Receive message
await foreach (var message in consumer.Messages())
{
Console.WriteLine($"Received: {message.Value()}");
await consumer.Acknowledge(message);
break;
}
}
}
Creating a client
Basic client configuration:var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.Build();
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar+ssl://localhost:6651"))
.TrustedCertificateAuthority(new X509Certificate2("/path/to/ca.cert.pem"))
.Build();
Producing messages
Basic producer
await using var producer = client.NewProducer(Schema.String)
.Topic("persistent://public/default/my-topic")
.Create();
// Send message
var messageId = await producer.Send("Hello Pulsar");
Console.WriteLine($"Message sent: {messageId}");
Producer with configuration
await using var producer = client.NewProducer(Schema.String)
.Topic("my-topic")
.ProducerName("my-producer")
.CompressionType(CompressionType.Lz4)
.Create();
Sending with metadata
var data = new MessageMetadata
{
["key1"] = "value1",
["key2"] = "value2"
};
var messageId = await producer.NewMessage()
.Properties(data)
.EventTime((ulong)DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
.Key("message-key")
.Value("Message content")
.Send();
Sending byte arrays
await using var producer = client.NewProducer(Schema.ByteArray)
.Topic("bytes-topic")
.Create();
var bytes = Encoding.UTF8.GetBytes("Hello");
var messageId = await producer.Send(bytes);
Consuming messages
Basic consumer
await using var consumer = client.NewConsumer(Schema.String)
.Topic("my-topic")
.SubscriptionName("my-subscription")
.SubscriptionType(SubscriptionType.Shared)
.Create();
await foreach (var message in consumer.Messages())
{
try
{
Console.WriteLine($"Received: {message.Value()}");
await consumer.Acknowledge(message);
}
catch (Exception ex)
{
await consumer.NegativeAcknowledge(message);
}
}
Consumer with cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await foreach (var message in consumer.Messages(cts.Token))
{
Console.WriteLine($"Received: {message.Value()}");
await consumer.Acknowledge(message);
}
Receive with timeout
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
var message = await consumer.Receive(cts.Token);
Console.WriteLine($"Received: {message.Value()}");
await consumer.Acknowledge(message);
}
catch (OperationCanceledException)
{
Console.WriteLine("Receive timeout");
}
Batch receive
await using var consumer = client.NewConsumer(Schema.String)
.Topic("my-topic")
.SubscriptionName("my-subscription")
.Create();
var messages = await consumer.Receive(100, cts.Token); // Receive up to 100 messages
foreach (var message in messages)
{
Console.WriteLine($"Received: {message.Value()}");
}
await consumer.Acknowledge(messages);
Using readers
await using var reader = client.NewReader(Schema.String)
.Topic("my-topic")
.StartMessageId(MessageId.Earliest)
.Create();
await foreach (var message in reader.Messages())
{
Console.WriteLine($"Read: {message.Value()}");
}
Working with schemas
JSON schema
public class User
{
public string Name { get; set; }
public int Age { get; set; }
}
// Producer
await using var producer = client.NewProducer(Schema.Json<User>())
.Topic("user-topic")
.Create();
var user = new User { Name = "John", Age = 30 };
var messageId = await producer.Send(user);
// Consumer
await using var consumer = client.NewConsumer(Schema.Json<User>())
.Topic("user-topic")
.SubscriptionName("user-sub")
.Create();
await foreach (var message in consumer.Messages())
{
var user = message.Value();
Console.WriteLine($"Name: {user.Name}, Age: {user.Age}");
await consumer.Acknowledge(message);
}
Primitive schemas
// String schema
var stringProducer = client.NewProducer(Schema.String)
.Topic("string-topic")
.Create();
// Byte array schema
var bytesProducer = client.NewProducer(Schema.ByteArray)
.Topic("bytes-topic")
.Create();
// Int32 schema
var intProducer = client.NewProducer(Schema.Int32)
.Topic("int-topic")
.Create();
// Boolean schema
var boolProducer = client.NewProducer(Schema.Boolean)
.Topic("bool-topic")
.Create();
Authentication
TLS authentication
var clientCertificate = new X509Certificate2("/path/to/client.pfx", "password");
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar+ssl://localhost:6651"))
.TrustedCertificateAuthority(new X509Certificate2("/path/to/ca.cert.pem"))
.Authentication(new TlsAuthentication(clientCertificate))
.Build();
Token authentication
using System.IO;
// Token string
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.Authentication(new TokenAuthentication("eyJhbGciOiJIUzI1NiJ9..."))
.Build();
// Token from file
var token = await File.ReadAllTextAsync("/path/to/token.txt");
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.Authentication(new TokenAuthentication(token.Trim()))
.Build();
Subscription types
// Exclusive (default)
var consumer = client.NewConsumer(Schema.String)
.Topic("my-topic")
.SubscriptionName("exclusive-sub")
.SubscriptionType(SubscriptionType.Exclusive)
.Create();
// Shared
var consumer = client.NewConsumer(Schema.String)
.Topic("my-topic")
.SubscriptionName("shared-sub")
.SubscriptionType(SubscriptionType.Shared)
.Create();
// Key_Shared
var consumer = client.NewConsumer(Schema.String)
.Topic("my-topic")
.SubscriptionName("key-shared-sub")
.SubscriptionType(SubscriptionType.KeyShared)
.Create();
// Failover
var consumer = client.NewConsumer(Schema.String)
.Topic("my-topic")
.SubscriptionName("failover-sub")
.SubscriptionType(SubscriptionType.Failover)
.Create();
Error handling
try
{
var messageId = await producer.Send("message");
}
catch (ProducerClosedException)
{
Console.WriteLine("Producer is closed");
}
catch (TimeoutException)
{
Console.WriteLine("Send timeout");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
ASP.NET Core integration
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using DotPulsar;
using DotPulsar.Extensions;
var builder = WebApplication.CreateBuilder(args);
// Register Pulsar client
builder.Services.AddSingleton<IPulsarClient>(sp =>
{
return PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.Build();
});
var app = builder.Build();
app.MapPost("/events", async (IPulsarClient client, EventData data) =>
{
await using var producer = client.NewProducer(Schema.Json<EventData>())
.Topic("events")
.Create();
var messageId = await producer.Send(data);
return Results.Ok(new { messageId = messageId.ToString() });
});
app.Run();
public record EventData(string Name, string Value);
Monitoring state
Monitor producer and consumer state:await using var producer = client.NewProducer(Schema.String)
.Topic("my-topic")
.Create();
producer.OnStateChangedFrom(ProducerState.Disconnected, ProducerState.Connected)
.ContinueWith(t => Console.WriteLine("Producer connected"));
producer.OnStateChangedTo(ProducerState.Faulted)
.ContinueWith(t => Console.WriteLine("Producer faulted"));
.NET client repository
The .NET client is maintained in a separate repository:Next steps
Schema support
Learn about Pulsar schemas
Subscription types
Understanding subscription types
Authentication
Configure authentication
Producers and consumers
Deep dive into messaging