Installation
Install the Go client:go get -u github.com/apache/pulsar-client-go/pulsar
The Go client requires Go 1.18 or later.
Quick start
Here’s a complete example:package main
import (
"context"
"fmt"
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
// Create client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// Send message
msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("Hello Pulsar!"),
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Published message: %v\n", msgID)
// Create consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Receive message
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received: %s\n", string(msg.Payload()))
consumer.Ack(msg)
}
Creating a client
Basic client configuration:client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar+ssl://localhost:6651",
TLSTrustCertsFilePath: "/path/to/ca.cert.pem",
})
Producing messages
Basic producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "persistent://public/default/my-topic",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// Synchronous send
msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("Hello Pulsar"),
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Message published: %v\n", msgID)
Producer with configuration
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
Name: "my-producer",
SendTimeout: 30 * time.Second,
DisableBatching: false,
BatchingMaxMessages: 100,
BatchingMaxPublishDelay: 10 * time.Millisecond,
CompressionType: pulsar.LZ4,
})
Sending with properties
msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("Message content"),
Properties: map[string]string{
"key1": "value1",
"key2": "value2",
},
Key: "message-key",
EventTime: time.Now(),
})
Asynchronous send
producer.SendAsync(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("Async message"),
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
if err != nil {
log.Printf("Failed to send: %v", err)
} else {
fmt.Printf("Message sent: %v\n", id)
}
})
Consuming messages
Basic consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received: %s\n", string(msg.Payload()))
consumer.Ack(msg)
}
Consumer with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
msg, err := consumer.Receive(ctx)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("Receive timeout")
} else {
log.Fatal(err)
}
} else {
fmt.Printf("Received: %s\n", string(msg.Payload()))
consumer.Ack(msg)
}
Consumer with message channel
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for msg := range consumer.Chan() {
fmt.Printf("Received: %s\n", string(msg.Payload()))
consumer.Ack(msg)
}
Negative acknowledgment
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
if err := processMessage(msg); err != nil {
consumer.Nack(msg) // Redeliver message
} else {
consumer.Ack(msg)
}
Using readers
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "my-topic",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Read: %s\n", string(msg.Payload()))
}
Working with schemas
JSON schema
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
jsonSchema := pulsar.NewJSONSchema(User{}, nil)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "user-topic",
Schema: jsonSchema,
})
user := User{Name: "John", Age: 30}
producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: user,
})
// Consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "user-topic",
SubscriptionName: "user-sub",
Schema: jsonSchema,
})
msg, _ := consumer.Receive(context.Background())
var receivedUser User
msg.GetValue(&receivedUser)
fmt.Printf("Name: %s, Age: %d\n", receivedUser.Name, receivedUser.Age)
Avro schema
avroSchema := pulsar.NewAvroSchema(avroSchemaJSON, nil)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "avro-topic",
Schema: avroSchema,
})
String schema
stringSchema := pulsar.NewStringSchema(nil)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "string-topic",
Schema: stringSchema,
})
producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: "Hello Pulsar",
})
Authentication
TLS authentication
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar+ssl://localhost:6651",
TLSTrustCertsFilePath: "/path/to/ca.cert.pem",
Authentication: pulsar.NewAuthenticationTLS(
"/path/to/client.cert.pem",
"/path/to/client.key.pem",
),
})
Token authentication
// Token string
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
Authentication: pulsar.NewAuthenticationToken("eyJhbGciOiJIUzI1NiJ9..."),
})
// Token from file
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
Authentication: pulsar.NewAuthenticationTokenFromFile("/path/to/token.txt"),
})
OAuth 2.0 authentication
oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "https://auth.example.com",
"audience": "https://pulsar.example.com",
"privateKey": "/path/to/credentials.json",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
Authentication: oauth,
})
Subscription types
// Exclusive
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "exclusive-sub",
Type: pulsar.Exclusive,
})
// Shared
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "shared-sub",
Type: pulsar.Shared,
})
// Key_Shared
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "key-shared-sub",
Type: pulsar.KeyShared,
})
// Failover
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "failover-sub",
Type: pulsar.Failover,
})
Error handling
import "github.com/apache/pulsar-client-go/pulsar"
msgID, err := producer.Send(ctx, msg)
if err != nil {
switch err {
case context.DeadlineExceeded:
fmt.Println("Send timeout")
case pulsar.ErrProducerClosed:
fmt.Println("Producer is closed")
case pulsar.ErrTopicTerminated:
fmt.Println("Topic terminated")
default:
fmt.Printf("Error: %v\n", err)
}
}
Context support
The Go client fully supports context for cancellation and timeouts:// With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte("Hello"),
})
// With cancellation
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(2 * time.Second)
cancel()
}()
msg, err := consumer.Receive(ctx)
Go client repository
The Go client is maintained in a separate repository:Next steps
Schema support
Learn about Pulsar schemas
Subscription types
Understanding subscription types
Authentication
Configure authentication
Pulsar Functions
Build serverless functions in Go