Skip to main content

Available Connectors

Apache Pulsar includes a comprehensive set of built-in connectors for integrating with popular external systems. Connectors are available as both sources (reading from external systems) and sinks (writing to external systems).

Messaging Systems

Kafka

Type: Source and Sink
Description: Connect Pulsar with Apache Kafka clusters for bidirectional data flow.
Configuration Example:
configs:
  bootstrapServers: "localhost:9092"
  groupId: "pulsar-kafka-source"
  topic: "my-kafka-topic"
  autoCommitEnabled: true
  sessionTimeoutMs: 30000
Key Features:
  • Consumer group management
  • SASL/SSL authentication support
  • Custom deserializers
  • Header copying support

RabbitMQ

Type: Source and Sink
Description: Integrate with RabbitMQ message broker for AMQP-based messaging.
Configuration Example:
configs:
  host: "localhost"
  port: 5672
  virtualHost: "/"
  username: "guest"
  password: "guest"
  queueName: "my-queue"

NSQ

Type: Source
Description: Read messages from NSQ distributed messaging platform.
Configuration Example:
configs:
  lookupd: "localhost:4161"
  topic: "my-nsq-topic"
  channel: "pulsar-channel"

Database Connectors

Cassandra

Type: Sink
Description: Write data into Apache Cassandra databases.
Configuration Example:
configs:
  roots: "localhost:9042"
  keyspace: "pulsar_keyspace"
  keyname: "key"
  columnFamily: "pulsar_table"
  columnName: "value"
Use Cases:
  • Time-series data storage
  • Wide-column data models
  • High-volume writes

JDBC Connectors

Pulsar provides JDBC connectors for multiple databases:

PostgreSQL

Type: Sink
Description: Write data to PostgreSQL databases with automatic schema management.
Configuration Example:
configs:
  jdbcUrl: "jdbc:postgresql://localhost:5432/pulsar"
  userName: "postgres"
  password: "password"
  tableName: "pulsar_table"

ClickHouse

Type: Sink
Description: High-performance sink for ClickHouse analytical database.

MariaDB

Type: Sink
Description: Write data to MariaDB/MySQL databases.

SQLite

Type: Sink
Description: Local database storage with SQLite.

OpenMLDB

Type: Sink
Description: Integration with OpenMLDB for machine learning workloads.

MongoDB

Type: Source and Sink
Description: Connect with MongoDB document databases.
Configuration Example:
configs:
  mongoUri: "mongodb://localhost:27017"
  database: "pulsar_db"
  collection: "pulsar_collection"

HBase

Type: Sink
Description: Write data to Apache HBase distributed database.

DynamoDB

Type: Sink
Description: Write data to AWS DynamoDB tables.
Configuration Example:
configs:
  awsEndpoint: "https://dynamodb.us-west-2.amazonaws.com"
  awsRegion: "us-west-2"
  tableName: "pulsar-table"

Search and Analytics

Elasticsearch

Type: Sink
Description: Index Pulsar messages into Elasticsearch for full-text search and analytics.
Configuration Example:
configs:
  elasticSearchUrl: "http://localhost:9200"
  indexName: "pulsar-index"
  typeName: "_doc"
  username: "elastic"
  password: "password"

Solr

Type: Sink
Description: Index data into Apache Solr search platform.
Configuration Example:
configs:
  solrUrl: "http://localhost:8983/solr"
  solrCollection: "pulsar-collection"
  solrCommitWithinMs: 1000

Azure Data Explorer

Type: Sink
Description: Write data to Azure Data Explorer (Kusto) for analytics.

Cloud Storage

AWS Kinesis

Type: Source and Sink
Description: Integrate with AWS Kinesis data streams.
Configuration Example:
configs:
  awsEndpoint: "https://kinesis.us-west-2.amazonaws.com"
  awsRegion: "us-west-2"
  streamName: "my-kinesis-stream"
  awsCredentialPluginParam: '{"accessKey":"xxx","secretKey":"yyy"}'

AWS S3 (via AWS connector)

Type: Source and Sink
Description: Read from and write to AWS S3 buckets.

Alluxio

Type: Sink
Description: Write data to Alluxio distributed file system.

Time Series Databases

InfluxDB

Type: Sink
Description: Write time-series data to InfluxDB.
Configuration Example:
configs:
  influxdbUrl: "http://localhost:8086"
  database: "pulsar"
  measurement: "pulsar_measurement"
  username: "admin"
  password: "password"

Aerospike

Type: Sink
Description: Write data to Aerospike NoSQL database optimized for real-time operations.

File Systems

HDFS3

Type: Sink
Description: Write data to Hadoop Distributed File System (HDFS) version 3.
Configuration Example:
configs:
  hdfsConfigResources: "core-site.xml,hdfs-site.xml"
  directory: "/pulsar/data"
  filenamePrefix: "pulsar-"
  compression: "SNAPPY"

File

Type: Source and Sink
Description: Read from and write to local file system.

CDC Connectors

Debezium

Type: Source
Description: Change Data Capture (CDC) connector supporting multiple databases.
Supported Databases:
  • MySQL
  • PostgreSQL
  • MongoDB
  • SQL Server
  • Oracle
Configuration Example:
configs:
  database.hostname: "localhost"
  database.port: "3306"
  database.user: "debezium"
  database.password: "password"
  database.server.id: "184054"
  database.server.name: "pulsar-mysql"
  database.include.list: "inventory"
  database.history.pulsar.topic: "schema-changes.inventory"

Canal

Type: Source
Description: MySQL binlog-based CDC connector.

Network Protocols

HTTP

Type: Sink
Description: Send data to HTTP endpoints.
Configuration Example:
configs:
  url: "http://localhost:8080/api/events"
  headers: '{"Content-Type":"application/json"}'
  method: "POST"

Netty

Type: Source
Description: Network server connector supporting TCP, UDP, and HTTP protocols.
Configuration Example:
configs:
  type: "tcp"
  host: "0.0.0.0"
  port: 10999
  numberOfThreads: 1

Testing and Development

Data Generator

Type: Source
Description: Generate test data for development and testing.
Configuration Example:
configs:
  sleepBetweenMessages: 1000
  numRecords: 100
  schema: "INT32"

Batch Data Generator

Type: Source
Description: Generate batches of test data.

Connector Adapters

Kafka Connect Adaptor

Type: Adapter
Description: Run Kafka Connect connectors within Pulsar IO framework.
This allows you to use any existing Kafka Connect connector with Pulsar.

Cache Systems

Redis

Type: Sink
Description: Write data to Redis key-value store.
Configuration Example:
configs:
  redisHosts: "localhost:6379"
  redisPassword: "password"
  redisDatabase: 0
  clientMode: "Standalone"
  operationTimeout: 2000

Connector Matrix

ConnectorSourceSinkCDC Support
Kafka-
RabbitMQ-
Cassandra--
PostgreSQL-✓ (via Debezium)
MySQL-✓ (via Debezium/Canal)
MongoDB✓ (via Debezium)
Elasticsearch--
Kinesis-
Redis--
HDFS3--
HTTP--

Next Steps

Build docs developers (and LLMs) love