Skip to main content
This guide covers developing Pulsar Functions, from writing your first function to advanced patterns like state management and windowing.

Function Interface

All functions implement a basic interface with a process method that handles each incoming message.

Java Functions

Java functions implement the Function<I, O> interface:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class MyFunction implements Function<String, String> {
    @Override
    public String process(String input, Context context) throws Exception {
        // Your processing logic here
        return transformedOutput;
    }
}
The generic types <I, O> define input and output types. Common types include:
  • String
  • byte[]
  • Integer, Long, Double
  • Custom POJOs with schema support
  • Void for functions with no output

Python Functions

Python functions extend the Function class:
from pulsar import Function

class MyFunction(Function):
    def __init__(self):
        pass

    def process(self, input, context):
        # Your processing logic here
        return transformed_output

Go Functions

Go functions are simple functions that accept a context and input:
package main

import (
    "context"
    "github.com/apache/pulsar/pulsar-function-go/pf"
)

func MyFunction(ctx context.Context, input []byte) ([]byte, error) {
    // Your processing logic here
    return transformedOutput, nil
}

func main() {
    pf.Start(MyFunction)
}

Using the Context API

The Context object provides access to function metadata, configuration, logging, and state management.

Accessing Function Metadata

public String process(String input, Context context) {
    // Get function information
    String functionName = context.getFunctionName();
    String namespace = context.getNamespace();
    int instanceId = context.getInstanceId();
    
    // Get input/output topics
    Collection<String> inputTopics = context.getInputTopics();
    String outputTopic = context.getOutputTopic();
    
    return result;
}

Logging

Use the context logger for structured logging:
import org.slf4j.Logger;

public class LoggingFunction implements Function<String, String> {
    @Override
    public String process(String input, Context context) {
        Logger log = context.getLogger();
        log.info("Processing message: {}", input);
        log.warn("Large message detected");
        log.error("Processing failed", exception);
        
        return String.format("%s!", input);
    }
}
Logs can be redirected to a Pulsar topic using the --log-topic deployment option.

User Configuration

Access custom configuration passed during deployment:
public class ConfigBasedFunction implements Function<String, String> {
    @Override
    public String process(String input, Context context) {
        // Get all config
        Map<String, Object> config = context.getUserConfigMap();
        
        // Get specific value with default
        String prefix = (String) context.getUserConfigValueOrDefault(
            "prefix", "default-prefix"
        );
        
        return prefix + ": " + input;
    }
}

State Management

Pulsar Functions provide built-in state storage backed by BookKeeper for fault-tolerant stateful processing.

Counter State

Use counters for simple numeric aggregation:
public class WordCountFunction implements Function<String, Void> {
    @Override
    public Void process(String input, Context context) {
        Arrays.asList(input.split("\\s+")).forEach(word -> {
            context.incrCounter(word, 1);
        });
        return null;
    }
}

Key-Value State

Store arbitrary state using key-value pairs:
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class StatefulFunction implements Function<String, String> {
    @Override
    public String process(String input, Context context) {
        String key = "last-message";
        
        // Get previous state
        ByteBuffer previous = context.getState(key);
        String previousMsg = previous != null 
            ? StandardCharsets.UTF_8.decode(previous).toString()
            : "none";
        
        // Update state
        context.putState(key, ByteBuffer.wrap(
            input.getBytes(StandardCharsets.UTF_8)
        ));
        
        return "Previous: " + previousMsg + ", Current: " + input;
    }
}

Async State Operations

For better performance, use async state operations:
public class AsyncStateFunction implements Function<String, CompletableFuture<String>> {
    @Override
    public CompletableFuture<String> process(String input, Context context) {
        return context.getStateAsync("key")
            .thenCompose(value -> {
                // Process with state
                ByteBuffer newValue = transform(input, value);
                return context.putStateAsync("key", newValue);
            })
            .thenApply(v -> "Processed: " + input);
    }
}

Publishing Messages

Functions can publish messages to topics beyond the configured output topic.

Publishing to Custom Topics

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException;

public class PublishFunction implements Function<String, Void> {
    @Override
    public Void process(String input, Context context) {
        String publishTopic = (String) context.getUserConfigValueOrDefault(
            "publish-topic", "default-topic"
        );
        
        String output = String.format("%s!", input);
        
        try {
            context.newOutputMessage(publishTopic, Schema.STRING)
                .value(output)
                .sendAsync();
        } catch (PulsarClientException e) {
            context.getLogger().error("Failed to publish", e);
        }
        
        return null;
    }
}

Publishing with Properties

context.newOutputMessage(outputTopic, Schema.STRING)
    .value(output)
    .property("source", "function")
    .property("timestamp", String.valueOf(System.currentTimeMillis()))
    .sendAsync();

Metrics and Monitoring

Record custom metrics to monitor function behavior:
public class UserMetricFunction implements Function<String, Void> {
    @Override
    public Void process(String input, Context context) {
        // Record metric value
        context.recordMetric("LetterCount", input.length());
        
        // Record function-specific metrics
        String metricName = String.format(
            "function-%s-messages-received",
            context.getFunctionName()
        );
        context.recordMetric(metricName, 1);
        
        return null;
    }
}
Metrics are automatically exported and can be viewed using Pulsar’s monitoring tools.

Window Functions

Window functions process batches of messages over time or count-based windows.

Implementing Window Functions

import java.util.Collection;
import java.util.function.Function;

public class AddWindowFunction implements Function<Collection<Integer>, Integer> {
    @Override
    public Integer apply(Collection<Integer> integers) {
        // Process entire window at once
        return integers.stream().reduce(0, (x, y) -> x + y);
    }
}
Window configuration is specified during deployment (see Deploying Functions).

Testing Functions Locally

Unit Testing

Test your function logic independently:
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class ExclamationFunctionTest {
    @Test
    public void testProcess() throws Exception {
        ExclamationFunction function = new ExclamationFunction();
        Context context = mock(Context.class);
        
        String result = function.process("hello", context);
        
        assertEquals("hello!", result);
    }
}

Local Mode Testing

Run functions locally using the Pulsar Functions localrun mode:
bin/pulsar-admin functions localrun \
    --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
    --inputs persistent://public/default/input-topic \
    --output persistent://public/default/output-topic \
    --jar my-function.jar

Best Practices

1

Keep Functions Lightweight

Functions should process messages quickly. For heavy computation, consider using Pulsar IO connectors to external systems.
2

Handle Errors Gracefully

Throw exceptions for transient errors (message will be retried). Return error indicators for permanent failures.
3

Use Async Operations

Use async state and publish operations for better throughput when processing time is dominated by I/O.
4

Monitor Function Health

Emit custom metrics for business-specific monitoring and set up alerts based on processing rates.
5

Version Your Functions

Use semantic versioning and test thoroughly before updating production functions.

Next Steps

Deploying Functions

Learn how to deploy functions to production

Runtime Configuration

Configure resources and execution mode

Build docs developers (and LLMs) love