The Apache Pulsar Python client is a C++ binding that provides native performance with Python ergonomics.
Installation
Install the Pulsar Python client:
pip install pulsar-client
For development with additional dependencies:
pip install pulsar-client[avro,functions]
The Python client requires Python 3.7 or later. Binary wheels are available for Linux (x86_64, ARM64) and macOS.
Quick start
Here’s a complete example:
import pulsar
# Create client
client = pulsar.Client( 'pulsar://localhost:6650' )
# Create producer
producer = client.create_producer( 'my-topic' )
# Send message
producer.send( 'Hello Pulsar!' .encode( 'utf-8' ))
print ( 'Message published' )
# Create consumer
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'my-subscription'
)
# Receive message
msg = consumer.receive()
print ( f "Received: { msg.data().decode( 'utf-8' ) } " )
consumer.acknowledge(msg)
# Close resources
producer.close()
consumer.close()
client.close()
Creating a client
Basic client configuration:
import pulsar
client = pulsar.Client(
service_url = 'pulsar://localhost:6650' ,
operation_timeout_seconds = 30 ,
connection_timeout_ms = 5000
)
For TLS connections:
client = pulsar.Client(
service_url = 'pulsar+ssl://localhost:6651' ,
tls_trust_certs_file_path = '/path/to/ca.cert.pem' ,
tls_allow_insecure_connection = False
)
Producing messages
Basic producer
producer = client.create_producer(
topic = 'persistent://public/default/my-topic'
)
# Synchronous send
msg_id = producer.send( 'Hello' .encode( 'utf-8' ))
print ( f 'Published message: { msg_id } ' )
# Asynchronous send
def send_callback ( res , msg_id ):
print ( f 'Message published: { msg_id } ' )
producer.send_async(
'Hello Async' .encode( 'utf-8' ),
callback = send_callback
)
Producer with configuration
producer = client.create_producer(
topic = 'my-topic' ,
producer_name = 'my-producer' ,
send_timeout_millis = 30000 ,
compression_type = pulsar.CompressionType. LZ4 ,
batching_enabled = True ,
batching_max_messages = 100 ,
batching_max_publish_delay_ms = 10
)
Sending with properties
producer.send(
content = 'Message content' .encode( 'utf-8' ),
properties = {
'key1' : 'value1' ,
'key2' : 'value2'
},
partition_key = 'my-key' ,
event_timestamp = int (time.time() * 1000 )
)
Consuming messages
Basic consumer
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'my-subscription' ,
consumer_type = pulsar.ConsumerType.Shared
)
while True :
msg = consumer.receive()
try :
print ( f "Received: { msg.data().decode( 'utf-8' ) } " )
consumer.acknowledge(msg)
except Exception as e:
consumer.negative_acknowledge(msg)
Consumer with timeout
try :
msg = consumer.receive( timeout_millis = 5000 )
print ( f "Received: { msg.data().decode( 'utf-8' ) } " )
consumer.acknowledge(msg)
except Exception :
print ( "No message received within timeout" )
Consumer with message listener
def message_listener ( consumer , message ):
try :
print ( f "Received: { message.data().decode( 'utf-8' ) } " )
consumer.acknowledge(message)
except Exception :
consumer.negative_acknowledge(message)
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'my-subscription' ,
message_listener = message_listener
)
# Keep running
import time
while True :
time.sleep( 1 )
Batch receive
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'my-subscription' ,
batch_receive_policy = pulsar.ConsumerBatchReceivePolicy(
max_num_messages = 100 ,
max_num_bytes = 1024 * 1024 ,
timeout_ms = 200
)
)
messages = consumer.batch_receive()
for msg in messages:
print ( f "Received: { msg.data().decode( 'utf-8' ) } " )
consumer.acknowledge(messages)
Using readers
reader = client.create_reader(
topic = 'my-topic' ,
start_message_id = pulsar.MessageId.earliest
)
while reader.has_message_available():
msg = reader.read_next()
print ( f "Read: { msg.data().decode( 'utf-8' ) } " )
reader.close()
Working with schemas
JSON schema
import pulsar
from pulsar.schema import *
class User ( Record ):
name = String()
age = Integer()
# Producer with schema
producer = client.create_producer(
topic = 'user-topic' ,
schema = JsonSchema(User)
)
user = User( name = 'John' , age = 30 )
producer.send(user)
# Consumer with schema
consumer = client.subscribe(
topic = 'user-topic' ,
subscription_name = 'user-sub' ,
schema = JsonSchema(User)
)
msg = consumer.receive()
user = msg.value()
print ( f "Name: { user.name } , Age: { user.age } " )
consumer.acknowledge(msg)
Avro schema
from pulsar.schema import *
class User ( Record ):
name = String()
age = Integer()
producer = client.create_producer(
topic = 'user-topic-avro' ,
schema = AvroSchema(User)
)
user = User( name = 'Alice' , age = 25 )
producer.send(user)
String and bytes schemas
# String schema
producer = client.create_producer(
topic = 'string-topic' ,
schema = pulsar.schema.StringSchema()
)
producer.send( 'Hello' )
# Bytes schema (default)
producer = client.create_producer(
topic = 'bytes-topic' ,
schema = pulsar.schema.BytesSchema()
)
producer.send( b 'raw bytes' )
Authentication
TLS authentication
client = pulsar.Client(
service_url = 'pulsar+ssl://localhost:6651' ,
tls_trust_certs_file_path = '/path/to/ca.cert.pem' ,
authentication = pulsar.AuthenticationTLS(
certificate_path = '/path/to/client.cert.pem' ,
private_key_path = '/path/to/client.key.pem'
)
)
Token authentication
# Token string
client = pulsar.Client(
service_url = 'pulsar://localhost:6650' ,
authentication = pulsar.AuthenticationToken( 'eyJhbGciOiJIUzI1NiJ9...' )
)
# Token from file
client = pulsar.Client(
service_url = 'pulsar://localhost:6650' ,
authentication = pulsar.AuthenticationToken(
token = lambda : open ( '/path/to/token.txt' ).read().strip()
)
)
OAuth 2.0 authentication
client = pulsar.Client(
service_url = 'pulsar://localhost:6650' ,
authentication = pulsar.AuthenticationOauth2(
issuer_url = 'https://auth.example.com' ,
credentials_url = 'file:///path/to/credentials.json' ,
audience = 'https://pulsar.example.com'
)
)
Subscription types
# Exclusive (default)
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'exclusive-sub' ,
consumer_type = pulsar.ConsumerType.Exclusive
)
# Shared
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'shared-sub' ,
consumer_type = pulsar.ConsumerType.Shared
)
# Key_Shared
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'key-shared-sub' ,
consumer_type = pulsar.ConsumerType.KeyShared
)
# Failover
consumer = client.subscribe(
topic = 'my-topic' ,
subscription_name = 'failover-sub' ,
consumer_type = pulsar.ConsumerType.Failover
)
Error handling
import pulsar
try :
producer.send( 'message' .encode( 'utf-8' ))
except pulsar.Timeout:
print ( 'Send timeout' )
except pulsar.ProducerClosed:
print ( 'Producer is closed' )
except pulsar.TopicTerminated:
print ( 'Topic has been terminated' )
except Exception as e:
print ( f 'Error: { e } ' )
Async/await support
The Python client supports asyncio:
import asyncio
import pulsar
async def main ():
client = pulsar.Client( 'pulsar://localhost:6650' )
producer = client.create_producer( 'my-topic' )
# Async send
future = producer.send_async( 'Hello' .encode( 'utf-8' ))
msg_id = await asyncio.wrap_future(future)
print ( f 'Message sent: { msg_id } ' )
client.close()
asyncio.run(main())
Python client repository
The Python client is maintained in a separate repository:
Next steps
Schema support Learn about Pulsar schemas
Subscription types Understanding subscription types
Authentication Configure authentication
Pulsar Functions Build serverless functions in Python