Skip to main content

Overview

This guide explains how to develop custom Pulsar IO connectors for integrating external systems with Apache Pulsar. You can create source connectors to ingest data into Pulsar or sink connectors to export data from Pulsar.

Prerequisites

  • Java 11 or later
  • Maven 3.6+
  • Understanding of Apache Pulsar concepts
  • Familiarity with the external system you’re integrating

Connector Interfaces

Source Interface

Source connectors implement the org.apache.pulsar.io.core.Source<T> interface:
package org.apache.pulsar.io.core;

import java.util.Map;
import org.apache.pulsar.functions.api.Record;

public interface Source<T> extends AutoCloseable {
    /**
     * Open connector with configuration.
     *
     * @param config initialization config
     * @param sourceContext environment where the source connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;

    /**
     * Reads the next message from source.
     * If source does not have any new messages, this call should block.
     * @return next message from source. The return result should never be null
     * @throws Exception
     */
    Record<T> read() throws Exception;
}

Sink Interface

Sink connectors implement the org.apache.pulsar.io.core.Sink<T> interface:
package org.apache.pulsar.io.core;

import java.util.Map;
import org.apache.pulsar.functions.api.Record;

public interface Sink<T> extends AutoCloseable {
    /**
     * Open connector with configuration.
     *
     * @param config initialization config
     * @param sinkContext environment where the sink connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;

    /**
     * Write a message to Sink.
     *
     * @param record record to write to sink
     * @throws Exception
     */
    void write(Record<T> record) throws Exception;
}

Creating a Source Connector

Step 1: Define Configuration Class

Create a configuration class using Lombok annotations and @FieldDoc for documentation:
package com.example.pulsar.io;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
@Accessors(chain = true)
public class MySourceConfig implements Serializable {

    @FieldDoc(
        required = true,
        defaultValue = "",
        help = "The hostname of the external system"
    )
    private String hostname;

    @FieldDoc(
        required = false,
        defaultValue = "8080",
        help = "The port number"
    )
    private int port = 8080;

    @FieldDoc(
        required = false,
        defaultValue = "1000",
        help = "Polling interval in milliseconds"
    )
    private long pollingInterval = 1000L;

    @FieldDoc(
        required = false,
        defaultValue = "",
        sensitive = true,
        help = "API key for authentication"
    )
    private String apiKey;

    public static MySourceConfig load(String yamlFile) throws IOException {
        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
        return mapper.readValue(new File(yamlFile), MySourceConfig.class);
    }

    public static MySourceConfig load(Map<String, Object> map) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(mapper.writeValueAsString(map), MySourceConfig.class);
    }
}

Step 2: Implement Source Connector

package com.example.pulsar.io;

import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;

@Slf4j
public class MySource implements Source<byte[]> {

    private MySourceConfig config;
    private ExternalClient client;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.config = MySourceConfig.load(config);
        
        // Initialize connection to external system
        this.client = new ExternalClient(
            this.config.getHostname(),
            this.config.getPort(),
            this.config.getApiKey()
        );
        
        this.client.connect();
        log.info("Opened MySource connector");
    }

    @Override
    public Record<byte[]> read() throws Exception {
        // Read data from external system
        Message message = client.readMessage();
        
        if (message == null) {
            // Block if no data available
            Thread.sleep(config.getPollingInterval());
            return null;
        }
        
        // Create a Pulsar record
        return new Record<byte[]>() {
            @Override
            public Optional<String> getKey() {
                return Optional.ofNullable(message.getKey());
            }

            @Override
            public byte[] getValue() {
                return message.getData();
            }

            @Override
            public Optional<Long> getEventTime() {
                return Optional.of(message.getTimestamp());
            }
        };
    }

    @Override
    public void close() throws Exception {
        if (client != null) {
            client.disconnect();
        }
        log.info("Closed MySource connector");
    }
}

Creating a Sink Connector

Step 1: Define Configuration Class

package com.example.pulsar.io;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
@Accessors(chain = true)
public class MySinkConfig implements Serializable {

    @FieldDoc(
        required = true,
        defaultValue = "",
        help = "The URL of the external system"
    )
    private String url;

    @FieldDoc(
        required = false,
        defaultValue = "100",
        help = "Batch size for writes"
    )
    private int batchSize = 100;

    @FieldDoc(
        required = false,
        defaultValue = "5000",
        help = "Timeout in milliseconds"
    )
    private long timeout = 5000L;

    public static MySinkConfig load(Map<String, Object> map) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(mapper.writeValueAsString(map), MySinkConfig.class);
    }
}

Step 2: Implement Sink Connector

package com.example.pulsar.io;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

@Slf4j
public class MySink implements Sink<byte[]> {

    private MySinkConfig config;
    private ExternalWriter writer;
    private List<Record<byte[]>> buffer;

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.config = MySinkConfig.load(config);
        this.buffer = new ArrayList<>();
        
        // Initialize connection to external system
        this.writer = new ExternalWriter(this.config.getUrl());
        this.writer.connect();
        
        log.info("Opened MySink connector");
    }

    @Override
    public void write(Record<byte[]> record) throws Exception {
        buffer.add(record);
        
        // Batch writes for efficiency
        if (buffer.size() >= config.getBatchSize()) {
            flush();
        }
    }

    private void flush() throws Exception {
        if (buffer.isEmpty()) {
            return;
        }
        
        try {
            // Write batch to external system
            writer.writeBatch(buffer);
            
            // Acknowledge all records
            buffer.forEach(Record::ack);
            
            log.debug("Successfully wrote {} records", buffer.size());
        } catch (Exception e) {
            // Fail all records on error
            buffer.forEach(Record::fail);
            log.error("Failed to write batch", e);
            throw e;
        } finally {
            buffer.clear();
        }
    }

    @Override
    public void close() throws Exception {
        // Flush remaining records
        flush();
        
        if (writer != null) {
            writer.disconnect();
        }
        log.info("Closed MySink connector");
    }
}

Connector Metadata

Create a pulsar-io.yaml file in src/main/resources/META-INF/services/:
name: my-connector
description: My custom source and sink connector
sourceClass: com.example.pulsar.io.MySource
sinkClass: com.example.pulsar.io.MySink
sourceConfigClass: com.example.pulsar.io.MySourceConfig
sinkConfigClass: com.example.pulsar.io.MySinkConfig

Project Setup

Maven Dependencies

Add these dependencies to your pom.xml:
<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-io-core</artifactId>
        <version>${pulsar.version}</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.14.0</version>
    </dependency>
    
    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-yaml</artifactId>
        <version>2.14.0</version>
    </dependency>
</dependencies>

Build NAR Package

Configure the NAR plugin:
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-nar-maven-plugin</artifactId>
            <version>1.3.5</version>
            <extensions>true</extensions>
        </plugin>
    </plugins>
</build>
Build the connector:
mvn clean package
This generates a .nar file in the target/ directory.

Testing Connectors

Unit Testing

import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.*;

public class MySinkTest {

    @Test
    public void testSinkWrite() throws Exception {
        Map<String, Object> config = new HashMap<>();
        config.put("url", "http://localhost:8080");
        config.put("batchSize", 10);
        
        SinkContext context = Mockito.mock(SinkContext.class);
        MySink sink = new MySink();
        sink.open(config, context);
        
        // Test writing records
        Record<byte[]> record = createTestRecord();
        sink.write(record);
        
        sink.close();
    }
}

Integration Testing

Test with a local Pulsar cluster:
# Start Pulsar standalone
bin/pulsar standalone

# Create connector
bin/pulsar-admin sinks create \
  --tenant public \
  --namespace default \
  --name my-sink \
  --archive /path/to/my-connector.nar \
  --inputs test-topic \
  --sink-config '{"url":"http://localhost:8080","batchSize":100}'

# Send test messages
bin/pulsar-client produce test-topic --messages "test message"

# Check connector status
bin/pulsar-admin sinks status --name my-sink

Best Practices

Configuration

  • Use @FieldDoc annotations for all configuration fields
  • Mark sensitive fields with sensitive = true
  • Provide sensible defaults
  • Validate configuration in the open() method

Error Handling

  • Always call record.ack() on successful processing
  • Call record.fail() on errors to enable retries
  • Implement proper cleanup in close()
  • Log errors with context information

Performance

  • Batch operations when possible
  • Use connection pooling for external systems
  • Implement backpressure handling
  • Monitor resource usage

Resource Management

  • Close all resources in close() method
  • Handle interruptions gracefully
  • Implement proper connection retry logic
  • Use try-with-resources where applicable

Monitoring

  • Add logging at key points
  • Use SinkContext/SourceContext for metrics
  • Track processing rates and latencies
  • Monitor error rates

Deployment

Package the Connector

mvn clean package
cp target/my-connector-1.0.nar /path/to/pulsar/connectors/

Deploy to Pulsar

# For sinks
pulsar-admin sinks create \
  --tenant public \
  --namespace default \
  --name my-sink \
  --archive connectors/my-connector-1.0.nar \
  --inputs input-topic \
  --sink-config-file sink-config.yaml

# For sources
pulsar-admin sources create \
  --tenant public \
  --namespace default \
  --name my-source \
  --archive connectors/my-connector-1.0.nar \
  --destination-topic-name output-topic \
  --source-config-file source-config.yaml

Advanced Topics

Schema Support

Handle Pulsar schemas in your connector:
@Override
public void write(Record<byte[]> record) throws Exception {
    Schema<?> schema = record.getSchema();
    
    if (schema != null && schema.getSchemaInfo() != null) {
        // Handle schema-aware processing
        SchemaType schemaType = schema.getSchemaInfo().getType();
        // Process based on schema type
    }
}

State Management

Use SourceContext for stateful sources:
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
    // Read state
    byte[] state = sourceContext.getState("offset");
    if (state != null) {
        this.offset = ByteBuffer.wrap(state).getLong();
    }
}

@Override
public Record<byte[]> read() throws Exception {
    // Update state
    ByteBuffer buffer = ByteBuffer.allocate(8);
    buffer.putLong(this.offset);
    sourceContext.putState("offset", buffer.array());
    
    // Continue reading
}

Dead Letter Queue

Configure DLQ for failed messages:
deadLetterTopic: "my-sink-dlq"
maxMessageRetries: 3

Examples

Reference implementations:
  • Kafka Connector: ~/workspace/source/pulsar-io/kafka
  • Cassandra Connector: ~/workspace/source/pulsar-io/cassandra
  • HTTP Sink: ~/workspace/source/pulsar-io/http
  • RabbitMQ Connector: ~/workspace/source/pulsar-io/rabbitmq

Troubleshooting

Common Issues

Connector fails to start:
  • Check configuration validation
  • Verify external system connectivity
  • Review connector logs
Messages not processed:
  • Verify input/output topics
  • Check message format compatibility
  • Review error logs
Performance issues:
  • Adjust batch sizes
  • Check network latency
  • Monitor resource utilization

Next Steps

Build docs developers (and LLMs) love