Skip to main content

Overview

The ClientBuilder interface is used to configure and construct a PulsarClient instance. It provides a fluent API for setting connection parameters, authentication, TLS configuration, threading, and various client behaviors.

Creating a ClientBuilder

ClientBuilder builder = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

Core Methods

build()

Construct the final PulsarClient instance.
PulsarClient build() throws PulsarClientException
Returns
PulsarClient
The new PulsarClient instance
Example:
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

clone()

Create a copy of the current client builder.
ClientBuilder clone()
Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. Example:
ClientBuilder builder = PulsarClient.builder()
    .ioThreads(8)
    .listenerThreads(4);

PulsarClient client1 = builder.clone()
    .serviceUrl("pulsar://localhost:6650").build();
PulsarClient client2 = builder.clone()
    .serviceUrl("pulsar://other-host:6650").build();

loadConf()

Load the configuration from provided config map.
ClientBuilder loadConf(Map<String, Object> config)
config
Map<String, Object>
required
Configuration map to load
Example:
Map<String, Object> config = new HashMap<>();
config.put("serviceUrl", "pulsar://localhost:6650");
config.put("numIoThreads", 20);

ClientBuilder builder = PulsarClient.builder().loadConf(config);
PulsarClient client = builder.build();

Connection Configuration

serviceUrl()

Configure the service URL for the Pulsar service. This parameter is required.
ClientBuilder serviceUrl(String serviceUrl)
serviceUrl
String
required
The URL of the Pulsar service that the client should connect to
Examples:
  • pulsar://my-broker:6650 for regular endpoint
  • pulsar+ssl://my-broker:6651 for TLS encrypted endpoint

serviceUrlProvider()

Configure the service URL provider for Pulsar service.
ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider)
serviceUrlProvider
ServiceUrlProvider
The provider instance that dynamically provides a service URL

listenerName()

Configure the listenerName that the broker will return the corresponding advertisedListener.
ClientBuilder listenerName(String name)
name
String
The listener name

connectionMaxIdleSeconds()

Release the connection if it is not used for more than specified seconds. Defaults to 25 seconds.
ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds)
connectionMaxIdleSeconds
int
Maximum idle time in seconds

connectionTimeout()

Set the duration of time to wait for a connection to a broker to be established.
ClientBuilder connectionTimeout(int duration, TimeUnit unit)
duration
int
The duration to wait
unit
TimeUnit
The time unit in which the duration is defined

connectionsPerBroker()

Sets the max number of connections that the client library will open to a single broker.
ClientBuilder connectionsPerBroker(int connectionsPerBroker)
connectionsPerBroker
int
Max number of connections per broker (needs to be greater than or equal to 0)
By default, the connection pool will use a single connection for all the producers and consumers. Increasing this parameter may improve throughput when using many producers over a high latency connection.

Authentication

authentication(Authentication)

Set the authentication provider to use in the Pulsar client instance.
ClientBuilder authentication(Authentication authentication)
authentication
Authentication
An instance of the Authentication provider already constructed
Example with TLS:
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker.example.com:6651/")
    .authentication(
        AuthenticationFactory.TLS("/my/cert/file", "/my/key/file")
    )
    .build();
Example with Token:
Authentication auth = AuthenticationFactory.token(
    "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"
);

authentication(String, String)

Configure the authentication provider to use in the Pulsar client instance.
ClientBuilder authentication(String authPluginClassName, String authParamsString) 
    throws UnsupportedAuthenticationException
authPluginClassName
String
Name of the Authentication-Plugin you want to use
authParamsString
String
String which represents parameters for the Authentication-Plugin, e.g., “key1:val1,key2:val2”
Example:
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker.example.com:6651/")
    .authentication(
        "org.apache.pulsar.client.impl.auth.AuthenticationTls",
        "tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file"
    )
    .build();

authentication(String, Map)

Configure the authentication provider using a config map.
ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams)
    throws UnsupportedAuthenticationException
authPluginClassName
String
Name of the Authentication-Plugin you want to use
authParams
Map<String, String>
Map which represents parameters for the Authentication-Plugin
Example:
Map<String, String> conf = new TreeMap<>();
conf.put("tlsCertFile", "/my/cert/file");
conf.put("tlsKeyFile", "/my/key/file");

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker.example.com:6651/")
    .authentication(
        "org.apache.pulsar.client.impl.auth.AuthenticationTls", conf
    )
    .build();

TLS Configuration

enableTls()

Configure whether to use TLS encryption on the connection.
@Deprecated
ClientBuilder enableTls(boolean enableTls)
Deprecated: use “pulsar+ssl://” in serviceUrl to enable TLS instead.

tlsKeyFilePath()

Set the path to the TLS key file.
ClientBuilder tlsKeyFilePath(String tlsKeyFilePath)
tlsKeyFilePath
String
Path to the TLS key file

tlsCertificateFilePath()

Set the path to the TLS certificate file.
ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath)
tlsCertificateFilePath
String
Path to the TLS certificate file

tlsTrustCertsFilePath()

Set the path to the trusted TLS certificate file.
ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath)
tlsTrustCertsFilePath
String
Path to the trusted TLS certificate file

allowTlsInsecureConnection()

Configure whether the Pulsar client accepts untrusted TLS certificate from broker. Default is false.
ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection)
allowTlsInsecureConnection
boolean
Whether to accept an untrusted TLS certificate

enableTlsHostnameVerification()

It allows to validate hostname verification when client connects to broker over TLS.
ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification)
enableTlsHostnameVerification
boolean
Whether to enable TLS hostname verification

useKeyStoreTls()

If TLS is enabled, whether use KeyStore type as TLS configuration parameter. False means use default pem type configuration.
ClientBuilder useKeyStoreTls(boolean useKeyStoreTls)
useKeyStoreTls
boolean
Whether to use KeyStore type

sslProvider()

The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
ClientBuilder sslProvider(String sslProvider)
sslProvider
String
The SSL provider name

tlsKeyStoreType()

The file format of the key store file.
ClientBuilder tlsKeyStoreType(String tlsKeyStoreType)
tlsKeyStoreType
String
The key store type

tlsKeyStorePath()

The location of the key store file.
ClientBuilder tlsKeyStorePath(String tlsTrustStorePath)
tlsTrustStorePath
String
The key store path

tlsKeyStorePassword()

The store password for the key store file.
ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword)
tlsKeyStorePassword
String
The key store password

tlsTrustStoreType()

The file format of the trust store file.
ClientBuilder tlsTrustStoreType(String tlsTrustStoreType)
tlsTrustStoreType
String
The trust store type

tlsTrustStorePath()

The location of the trust store file.
ClientBuilder tlsTrustStorePath(String tlsTrustStorePath)
tlsTrustStorePath
String
The trust store path

tlsTrustStorePassword()

The store password for the trust store file.
ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword)
tlsTrustStorePassword
String
The trust store password

tlsCiphers()

A list of cipher suites.
ClientBuilder tlsCiphers(Set<String> tlsCiphers)
tlsCiphers
Set<String>
The cipher suites to use

tlsProtocols()

The SSL protocol used to generate the SSLContext. Default setting is TLS.
ClientBuilder tlsProtocols(Set<String> tlsProtocols)
tlsProtocols
Set<String>
Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1

Threading Configuration

ioThreads()

Set the number of threads to be used for handling connections to brokers. Default is Runtime.getRuntime().availableProcessors().
ClientBuilder ioThreads(int numIoThreads)
numIoThreads
int
The number of IO threads

listenerThreads()

Set the number of threads to be used for message listeners. Default is Runtime.getRuntime().availableProcessors().
ClientBuilder listenerThreads(int numListenerThreads)
numListenerThreads
int
The number of listener threads
The listener thread pool is shared across all the consumers and readers that are using a “listener” model to get messages. For a given consumer, the listener will always be invoked from the same thread, to ensure ordering.

Timeout Configuration

operationTimeout()

Set the operation timeout. Default is 30 seconds.
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit)
operationTimeout
int
Operation timeout
unit
TimeUnit
Time unit for operationTimeout
Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed.

lookupTimeout()

Set lookup timeout. Default matches operation timeout.
ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit)
lookupTimeout
int
Lookup timeout
unit
TimeUnit
Time unit for lookupTimeout

Network Configuration

enableTcpNoDelay()

Configure whether to use TCP no-delay flag on the connection. Default value is true.
ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay)
enableTcpNoDelay
boolean
Whether to enable TCP no-delay feature
No-delay features make sure packets are sent out on the network as soon as possible, and it’s critical to achieve low latency publishes.

keepAliveInterval()

Set keep alive interval for each client-broker-connection. Default is 30 seconds.
ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit)
keepAliveInterval
int
Keep alive interval
unit
TimeUnit
The time unit in which the keepAliveInterval is defined

Memory and Resource Configuration

memoryLimit()

Configure a limit on the amount of direct memory that will be allocated by this client instance. Default is 64 MB.
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit)
memoryLimit
long
The limit (setting to 0 will disable the limit)
unit
SizeUnit
The memory limit size unit

maxConcurrentLookupRequests()

Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker. Default is 5000.
ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests)
maxConcurrentLookupRequests
int
Max concurrent lookup requests

maxLookupRequests()

Number of max lookup-requests allowed on each broker-connection to prevent overload on broker. Default is 50000.
ClientBuilder maxLookupRequests(int maxLookupRequests)
maxLookupRequests
int
Max lookup requests

maxLookupRedirects()

Set the maximum number of times a lookup-request to a broker will be redirected.
ClientBuilder maxLookupRedirects(int maxLookupRedirects)
maxLookupRedirects
int
The maximum number of redirects

maxNumberOfRejectedRequestPerConnection()

Set max number of broker-rejected requests in a certain time-frame (60 seconds). Default is 50.
ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection)
maxNumberOfRejectedRequestPerConnection
int
Max number of rejected requests

Backoff Configuration

startingBackoffInterval()

Set the duration of time for a backoff interval.
ClientBuilder startingBackoffInterval(long duration, TimeUnit unit)
duration
long
The duration of the interval
unit
TimeUnit
The time unit in which the duration is defined

maxBackoffInterval()

Set the maximum duration of time for a backoff interval.
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit)
duration
long
The duration of the interval
unit
TimeUnit
The time unit in which the duration is defined

Telemetry and Monitoring

openTelemetry()

Configure OpenTelemetry for Pulsar Client.
ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry)
openTelemetry
OpenTelemetry
The OpenTelemetry instance
When you pass an OpenTelemetry instance, Pulsar client will emit metrics that can be exported in a variety of different methods.

enableTracing()

Enable OpenTelemetry distributed tracing.
ClientBuilder enableTracing(boolean tracingEnabled)
tracingEnabled
boolean
Whether to enable tracing (default: false)
When enabled, interceptors are automatically added to all producers and consumers to create spans for message publishing and consumption. Example with Java Agent:
// When using -javaagent:opentelemetry-javaagent.jar
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .enableTracing(true)  // Use GlobalOpenTelemetry
    .build();

statsInterval()

Set the interval between each stat info. Default is 60 seconds.
@Deprecated
ClientBuilder statsInterval(long statsInterval, TimeUnit unit)
statsInterval
long
The interval between each stat info
unit
TimeUnit
Time unit for statsInterval
Deprecated: See openTelemetry() instead.

Advanced Configuration

clock()

The clock used by the pulsar client.
ClientBuilder clock(Clock clock)
clock
Clock
The clock used by the pulsar client to retrieve time information
The clock is currently used by producer for setting publish timestamps. The default clock is a system default zone clock.
The clock is used for TTL enforcement and timestamp based seeks, so be aware of the impacts if you are going to use a different clock.

enableBusyWait()

Option to enable busy-wait settings. Default is false.
ClientBuilder enableBusyWait(boolean enableBusyWait)
enableBusyWait
boolean
Whether to enable busy wait
This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches. The spinning will consume 100% CPU even when the broker is not doing any work.

proxyServiceUrl()

Proxy-service url when client would like to connect to broker via proxy.
ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol)
proxyServiceUrl
String
Proxy service url
proxyProtocol
ProxyProtocol
Protocol to decide type of proxy routing eg: SNI-routing

enableTransaction()

If enable transaction, start the transactionCoordinatorClient with pulsar client.
ClientBuilder enableTransaction(boolean enableTransaction)
enableTransaction
boolean
Whether enable transaction feature

dnsLookupBind()

Set dns lookup bind address and port.
ClientBuilder dnsLookupBind(String address, int port)
address
String
DNS bind address
port
int
DNS bind port

dnsServerAddresses()

Set dns lookup server addresses.
ClientBuilder dnsServerAddresses(List<InetSocketAddress> addresses)
addresses
List<InetSocketAddress>
DNS server addresses

socks5ProxyAddress()

Set socks5 proxy address.
ClientBuilder socks5ProxyAddress(InetSocketAddress socks5ProxyAddress)
socks5ProxyAddress
InetSocketAddress
SOCKS5 proxy address

socks5ProxyUsername()

Set socks5 proxy username.
ClientBuilder socks5ProxyUsername(String socks5ProxyUsername)
socks5ProxyUsername
String
SOCKS5 proxy username

socks5ProxyPassword()

Set socks5 proxy password.
ClientBuilder socks5ProxyPassword(String socks5ProxyPassword)
socks5ProxyPassword
String
SOCKS5 proxy password

description()

Set the description.
ClientBuilder description(String description)
description
String
The description of the current PulsarClient instance
By default, when the client connects to the broker, a version string like Pulsar-Java-v<x.y.z> will be carried and saved by the broker. This method provides a way to add more description to a specific PulsarClient instance.

sharedResources()

Provide a set of shared client resources to be reused by this client.
ClientBuilder sharedResources(PulsarClientSharedResources sharedResources)
sharedResources
PulsarClientSharedResources
The shared resources instance created with PulsarClientSharedResources.builder()
Providing a shared resource instance allows PulsarClient instances to share resources (such as IO/event loops, timers, executors, DNS resolver/cache) with other PulsarClient instances.

lookupProperties()

Set the properties used for topic lookup.
ClientBuilder lookupProperties(Map<String, String> properties)
properties
Map<String, String>
Lookup properties map
When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized load manager.

serviceUrlQuarantineInitDuration()

Configure the service URL init quarantine duration. Default is 60 seconds.
ClientBuilder serviceUrlQuarantineInitDuration(long serviceUrlQuarantineInitDuration, TimeUnit unit)
serviceUrlQuarantineInitDuration
long
The initial quarantine duration for unavailable endpoint
unit
TimeUnit
The time unit for the quarantine duration

serviceUrlQuarantineMaxDuration()

Configure the service URL max quarantine duration. Default is 1 day.
ClientBuilder serviceUrlQuarantineMaxDuration(long serviceUrlQuarantineMaxDuration, TimeUnit unit)
serviceUrlQuarantineMaxDuration
long
The maximum quarantine duration for unavailable endpoint
unit
TimeUnit
The time unit for the quarantine duration

sslFactoryPlugin()

Set the SSL Factory Plugin for custom implementation to create SSL Context and SSLEngine.
ClientBuilder sslFactoryPlugin(String sslFactoryPlugin)
sslFactoryPlugin
String
SSL factory class name

sslFactoryPluginParams()

Set the SSL Factory Plugin params for the ssl factory plugin to use.
ClientBuilder sslFactoryPluginParams(String sslFactoryPluginParams)
sslFactoryPluginParams
String
Params in String format that will be inputted to the SSL Factory Plugin

autoCertRefreshSeconds()

Set Cert Refresh interval in seconds.
ClientBuilder autoCertRefreshSeconds(int autoCertRefreshSeconds)
autoCertRefreshSeconds
int
Auto certificate refresh interval in seconds

Build docs developers (and LLMs) love