Skip to main content
The Apache Pulsar Python client is a C++ binding that provides native performance with Python ergonomics.

Installation

Install the Pulsar Python client:
pip install pulsar-client
For development with additional dependencies:
pip install pulsar-client[avro,functions]
The Python client requires Python 3.7 or later. Binary wheels are available for Linux (x86_64, ARM64) and macOS.

Quick start

Here’s a complete example:
import pulsar

# Create client
client = pulsar.Client('pulsar://localhost:6650')

# Create producer
producer = client.create_producer('my-topic')

# Send message
producer.send('Hello Pulsar!'.encode('utf-8'))
print('Message published')

# Create consumer
consumer = client.subscribe(
    topic='my-topic',
    subscription_name='my-subscription'
)

# Receive message
msg = consumer.receive()
print(f"Received: {msg.data().decode('utf-8')}")
consumer.acknowledge(msg)

# Close resources
producer.close()
consumer.close()
client.close()

Creating a client

Basic client configuration:
import pulsar

client = pulsar.Client(
    service_url='pulsar://localhost:6650',
    operation_timeout_seconds=30,
    connection_timeout_ms=5000
)
For TLS connections:
client = pulsar.Client(
    service_url='pulsar+ssl://localhost:6651',
    tls_trust_certs_file_path='/path/to/ca.cert.pem',
    tls_allow_insecure_connection=False
)

Producing messages

Basic producer

producer = client.create_producer(
    topic='persistent://public/default/my-topic'
)

# Synchronous send
msg_id = producer.send('Hello'.encode('utf-8'))
print(f'Published message: {msg_id}')

# Asynchronous send
def send_callback(res, msg_id):
    print(f'Message published: {msg_id}')

producer.send_async(
    'Hello Async'.encode('utf-8'),
    callback=send_callback
)

Producer with configuration

producer = client.create_producer(
    topic='my-topic',
    producer_name='my-producer',
    send_timeout_millis=30000,
    compression_type=pulsar.CompressionType.LZ4,
    batching_enabled=True,
    batching_max_messages=100,
    batching_max_publish_delay_ms=10
)

Sending with properties

producer.send(
    content='Message content'.encode('utf-8'),
    properties={
        'key1': 'value1',
        'key2': 'value2'
    },
    partition_key='my-key',
    event_timestamp=int(time.time() * 1000)
)

Consuming messages

Basic consumer

consumer = client.subscribe(
    topic='my-topic',
    subscription_name='my-subscription',
    consumer_type=pulsar.ConsumerType.Shared
)

while True:
    msg = consumer.receive()
    try:
        print(f"Received: {msg.data().decode('utf-8')}")
        consumer.acknowledge(msg)
    except Exception as e:
        consumer.negative_acknowledge(msg)

Consumer with timeout

try:
    msg = consumer.receive(timeout_millis=5000)
    print(f"Received: {msg.data().decode('utf-8')}")
    consumer.acknowledge(msg)
except Exception:
    print("No message received within timeout")

Consumer with message listener

def message_listener(consumer, message):
    try:
        print(f"Received: {message.data().decode('utf-8')}")
        consumer.acknowledge(message)
    except Exception:
        consumer.negative_acknowledge(message)

consumer = client.subscribe(
    topic='my-topic',
    subscription_name='my-subscription',
    message_listener=message_listener
)

# Keep running
import time
while True:
    time.sleep(1)

Batch receive

consumer = client.subscribe(
    topic='my-topic',
    subscription_name='my-subscription',
    batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(
        max_num_messages=100,
        max_num_bytes=1024 * 1024,
        timeout_ms=200
    )
)

messages = consumer.batch_receive()
for msg in messages:
    print(f"Received: {msg.data().decode('utf-8')}")
consumer.acknowledge(messages)

Using readers

reader = client.create_reader(
    topic='my-topic',
    start_message_id=pulsar.MessageId.earliest
)

while reader.has_message_available():
    msg = reader.read_next()
    print(f"Read: {msg.data().decode('utf-8')}")

reader.close()

Working with schemas

JSON schema

import pulsar
from pulsar.schema import *

class User(Record):
    name = String()
    age = Integer()

# Producer with schema
producer = client.create_producer(
    topic='user-topic',
    schema=JsonSchema(User)
)

user = User(name='John', age=30)
producer.send(user)

# Consumer with schema
consumer = client.subscribe(
    topic='user-topic',
    subscription_name='user-sub',
    schema=JsonSchema(User)
)

msg = consumer.receive()
user = msg.value()
print(f"Name: {user.name}, Age: {user.age}")
consumer.acknowledge(msg)

Avro schema

from pulsar.schema import *

class User(Record):
    name = String()
    age = Integer()

producer = client.create_producer(
    topic='user-topic-avro',
    schema=AvroSchema(User)
)

user = User(name='Alice', age=25)
producer.send(user)

String and bytes schemas

# String schema
producer = client.create_producer(
    topic='string-topic',
    schema=pulsar.schema.StringSchema()
)
producer.send('Hello')

# Bytes schema (default)
producer = client.create_producer(
    topic='bytes-topic',
    schema=pulsar.schema.BytesSchema()
)
producer.send(b'raw bytes')

Authentication

TLS authentication

client = pulsar.Client(
    service_url='pulsar+ssl://localhost:6651',
    tls_trust_certs_file_path='/path/to/ca.cert.pem',
    authentication=pulsar.AuthenticationTLS(
        certificate_path='/path/to/client.cert.pem',
        private_key_path='/path/to/client.key.pem'
    )
)

Token authentication

# Token string
client = pulsar.Client(
    service_url='pulsar://localhost:6650',
    authentication=pulsar.AuthenticationToken('eyJhbGciOiJIUzI1NiJ9...')
)

# Token from file
client = pulsar.Client(
    service_url='pulsar://localhost:6650',
    authentication=pulsar.AuthenticationToken(
        token=lambda: open('/path/to/token.txt').read().strip()
    )
)

OAuth 2.0 authentication

client = pulsar.Client(
    service_url='pulsar://localhost:6650',
    authentication=pulsar.AuthenticationOauth2(
        issuer_url='https://auth.example.com',
        credentials_url='file:///path/to/credentials.json',
        audience='https://pulsar.example.com'
    )
)

Subscription types

# Exclusive (default)
consumer = client.subscribe(
    topic='my-topic',
    subscription_name='exclusive-sub',
    consumer_type=pulsar.ConsumerType.Exclusive
)

# Shared
consumer = client.subscribe(
    topic='my-topic',
    subscription_name='shared-sub',
    consumer_type=pulsar.ConsumerType.Shared
)

# Key_Shared
consumer = client.subscribe(
    topic='my-topic',
    subscription_name='key-shared-sub',
    consumer_type=pulsar.ConsumerType.KeyShared
)

# Failover
consumer = client.subscribe(
    topic='my-topic',
    subscription_name='failover-sub',
    consumer_type=pulsar.ConsumerType.Failover
)

Error handling

import pulsar

try:
    producer.send('message'.encode('utf-8'))
except pulsar.Timeout:
    print('Send timeout')
except pulsar.ProducerClosed:
    print('Producer is closed')
except pulsar.TopicTerminated:
    print('Topic has been terminated')
except Exception as e:
    print(f'Error: {e}')

Async/await support

The Python client supports asyncio:
import asyncio
import pulsar

async def main():
    client = pulsar.Client('pulsar://localhost:6650')
    
    producer = client.create_producer('my-topic')
    
    # Async send
    future = producer.send_async('Hello'.encode('utf-8'))
    msg_id = await asyncio.wrap_future(future)
    print(f'Message sent: {msg_id}')
    
    client.close()

asyncio.run(main())

Python client repository

The Python client is maintained in a separate repository:

Next steps

Schema support

Learn about Pulsar schemas

Subscription types

Understanding subscription types

Authentication

Configure authentication

Pulsar Functions

Build serverless functions in Python

Build docs developers (and LLMs) love