Overview
This guide explains how to develop custom Pulsar IO connectors for integrating external systems with Apache Pulsar. You can create source connectors to ingest data into Pulsar or sink connectors to export data from Pulsar.Prerequisites
- Java 11 or later
- Maven 3.6+
- Understanding of Apache Pulsar concepts
- Familiarity with the external system you’re integrating
Connector Interfaces
Source Interface
Source connectors implement theorg.apache.pulsar.io.core.Source<T> interface:
Sink Interface
Sink connectors implement theorg.apache.pulsar.io.core.Sink<T> interface:
Creating a Source Connector
Step 1: Define Configuration Class
Create a configuration class using Lombok annotations and@FieldDoc for documentation:
Step 2: Implement Source Connector
Creating a Sink Connector
Step 1: Define Configuration Class
Step 2: Implement Sink Connector
Connector Metadata
Create apulsar-io.yaml file in src/main/resources/META-INF/services/:
Project Setup
Maven Dependencies
Add these dependencies to yourpom.xml:
Build NAR Package
Configure the NAR plugin:.nar file in the target/ directory.
Testing Connectors
Unit Testing
Integration Testing
Test with a local Pulsar cluster:Best Practices
Configuration
- Use
@FieldDocannotations for all configuration fields - Mark sensitive fields with
sensitive = true - Provide sensible defaults
- Validate configuration in the
open()method
Error Handling
- Always call
record.ack()on successful processing - Call
record.fail()on errors to enable retries - Implement proper cleanup in
close() - Log errors with context information
Performance
- Batch operations when possible
- Use connection pooling for external systems
- Implement backpressure handling
- Monitor resource usage
Resource Management
- Close all resources in
close()method - Handle interruptions gracefully
- Implement proper connection retry logic
- Use try-with-resources where applicable
Monitoring
- Add logging at key points
- Use SinkContext/SourceContext for metrics
- Track processing rates and latencies
- Monitor error rates
Deployment
Package the Connector
Deploy to Pulsar
Advanced Topics
Schema Support
Handle Pulsar schemas in your connector:State Management
Use SourceContext for stateful sources:Dead Letter Queue
Configure DLQ for failed messages:Examples
Reference implementations:- Kafka Connector: ~/workspace/source/pulsar-io/kafka
- Cassandra Connector: ~/workspace/source/pulsar-io/cassandra
- HTTP Sink: ~/workspace/source/pulsar-io/http
- RabbitMQ Connector: ~/workspace/source/pulsar-io/rabbitmq
Troubleshooting
Common Issues
Connector fails to start:- Check configuration validation
- Verify external system connectivity
- Review connector logs
- Verify input/output topics
- Check message format compatibility
- Review error logs
- Adjust batch sizes
- Check network latency
- Monitor resource utilization
Next Steps
- Review built-in connectors for implementation examples
- Read the Pulsar IO Overview
- Explore advanced connector patterns
- Contribute your connector to Apache Pulsar