Skip to main content
The Apache Pulsar Go client is a pure Go implementation optimized for building cloud-native microservices.

Installation

Install the Go client:
go get -u github.com/apache/pulsar-client-go/pulsar
The Go client requires Go 1.18 or later.

Quick start

Here’s a complete example:
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    // Create client
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create producer
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "my-topic",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    // Send message
    msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
        Payload: []byte("Hello Pulsar!"),
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Published message: %v\n", msgID)

    // Create consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "my-topic",
        SubscriptionName: "my-subscription",
        Type:             pulsar.Shared,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Receive message
    msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Received: %s\n", string(msg.Payload()))
    consumer.Ack(msg)
}

Creating a client

Basic client configuration:
client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:                     "pulsar://localhost:6650",
    OperationTimeout:        30 * time.Second,
    ConnectionTimeout:       5 * time.Second,
})
if err != nil {
    log.Fatal(err)
}
defer client.Close()
For TLS connections:
client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:                   "pulsar+ssl://localhost:6651",
    TLSTrustCertsFilePath: "/path/to/ca.cert.pem",
})

Producing messages

Basic producer

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: "persistent://public/default/my-topic",
})
if err != nil {
    log.Fatal(err)
}
defer producer.Close()

// Synchronous send
msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
    Payload: []byte("Hello Pulsar"),
})
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Message published: %v\n", msgID)

Producer with configuration

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:                   "my-topic",
    Name:                    "my-producer",
    SendTimeout:             30 * time.Second,
    DisableBatching:         false,
    BatchingMaxMessages:     100,
    BatchingMaxPublishDelay: 10 * time.Millisecond,
    CompressionType:         pulsar.LZ4,
})

Sending with properties

msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
    Payload: []byte("Message content"),
    Properties: map[string]string{
        "key1": "value1",
        "key2": "value2",
    },
    Key:          "message-key",
    EventTime:    time.Now(),
})

Asynchronous send

producer.SendAsync(context.Background(), &pulsar.ProducerMessage{
    Payload: []byte("Async message"),
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
    if err != nil {
        log.Printf("Failed to send: %v", err)
    } else {
        fmt.Printf("Message sent: %v\n", id)
    }
})

Consuming messages

Basic consumer

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "my-subscription",
    Type:             pulsar.Shared,
})
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()

for {
    msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Printf("Received: %s\n", string(msg.Payload()))
    consumer.Ack(msg)
}

Consumer with timeout

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

msg, err := consumer.Receive(ctx)
if err != nil {
    if ctx.Err() == context.DeadlineExceeded {
        fmt.Println("Receive timeout")
    } else {
        log.Fatal(err)
    }
} else {
    fmt.Printf("Received: %s\n", string(msg.Payload()))
    consumer.Ack(msg)
}

Consumer with message channel

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "my-subscription",
})
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()

for msg := range consumer.Chan() {
    fmt.Printf("Received: %s\n", string(msg.Payload()))
    consumer.Ack(msg)
}

Negative acknowledgment

msg, err := consumer.Receive(context.Background())
if err != nil {
    log.Fatal(err)
}

if err := processMessage(msg); err != nil {
    consumer.Nack(msg) // Redeliver message
} else {
    consumer.Ack(msg)
}

Using readers

reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:          "my-topic",
    StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
    log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
    msg, err := reader.Next(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Read: %s\n", string(msg.Payload()))
}

Working with schemas

JSON schema

type User struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

jsonSchema := pulsar.NewJSONSchema(User{}, nil)

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:  "user-topic",
    Schema: jsonSchema,
})

user := User{Name: "John", Age: 30}
producer.Send(context.Background(), &pulsar.ProducerMessage{
    Value: user,
})

// Consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "user-topic",
    SubscriptionName: "user-sub",
    Schema:           jsonSchema,
})

msg, _ := consumer.Receive(context.Background())
var receivedUser User
msg.GetValue(&receivedUser)
fmt.Printf("Name: %s, Age: %d\n", receivedUser.Name, receivedUser.Age)

Avro schema

avroSchema := pulsar.NewAvroSchema(avroSchemaJSON, nil)

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:  "avro-topic",
    Schema: avroSchema,
})

String schema

stringSchema := pulsar.NewStringSchema(nil)

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:  "string-topic",
    Schema: stringSchema,
})

producer.Send(context.Background(), &pulsar.ProducerMessage{
    Value: "Hello Pulsar",
})

Authentication

TLS authentication

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:                   "pulsar+ssl://localhost:6651",
    TLSTrustCertsFilePath: "/path/to/ca.cert.pem",
    Authentication: pulsar.NewAuthenticationTLS(
        "/path/to/client.cert.pem",
        "/path/to/client.key.pem",
    ),
})

Token authentication

// Token string
client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
    Authentication: pulsar.NewAuthenticationToken("eyJhbGciOiJIUzI1NiJ9..."),
})

// Token from file
client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
    Authentication: pulsar.NewAuthenticationTokenFromFile("/path/to/token.txt"),
})

OAuth 2.0 authentication

oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
    "type":         "client_credentials",
    "issuerUrl":    "https://auth.example.com",
    "audience":     "https://pulsar.example.com",
    "privateKey":   "/path/to/credentials.json",
})

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:            "pulsar://localhost:6650",
    Authentication: oauth,
})

Subscription types

// Exclusive
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "exclusive-sub",
    Type:             pulsar.Exclusive,
})

// Shared
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "shared-sub",
    Type:             pulsar.Shared,
})

// Key_Shared
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "key-shared-sub",
    Type:             pulsar.KeyShared,
})

// Failover
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "failover-sub",
    Type:             pulsar.Failover,
})

Error handling

import "github.com/apache/pulsar-client-go/pulsar"

msgID, err := producer.Send(ctx, msg)
if err != nil {
    switch err {
    case context.DeadlineExceeded:
        fmt.Println("Send timeout")
    case pulsar.ErrProducerClosed:
        fmt.Println("Producer is closed")
    case pulsar.ErrTopicTerminated:
        fmt.Println("Topic terminated")
    default:
        fmt.Printf("Error: %v\n", err)
    }
}

Context support

The Go client fully supports context for cancellation and timeouts:
// With timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
    Payload: []byte("Hello"),
})

// With cancellation
ctx, cancel := context.WithCancel(context.Background())
go func() {
    time.Sleep(2 * time.Second)
    cancel()
}()

msg, err := consumer.Receive(ctx)

Go client repository

The Go client is maintained in a separate repository:

Next steps

Schema support

Learn about Pulsar schemas

Subscription types

Understanding subscription types

Authentication

Configure authentication

Pulsar Functions

Build serverless functions in Go

Build docs developers (and LLMs) love