Skip to main content
Infinitic uses Apache Pulsar as its distributed messaging transport layer. Pulsar provides reliable, scalable message delivery between workers, clients, and the orchestration engine.

Overview

Pulsar acts as the backbone for all communication in Infinitic:
  • Task dispatching and execution
  • Workflow state transitions
  • Event notifications
  • Worker coordination
The transport configuration is defined in your YAML configuration file under the transport.pulsar section.

Basic Configuration

Here’s a minimal Pulsar configuration for local development:
transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650/
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

Required Parameters

brokerServiceUrl
string
required
Pulsar broker service URL. Must start with pulsar:// or pulsar+ssl://.Example: pulsar://localhost:6650/
webServiceUrl
string
required
Pulsar HTTP admin service URL. Must start with http:// or https://.Example: http://localhost:8080
tenant
string
required
Pulsar tenant name. Cannot be blank.Example: infinitic
namespace
string
required
Pulsar namespace within the tenant. Cannot be blank.Example: production

Client Configuration

Configure the Pulsar client connection behavior:
transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar.example.com:6650/
    webServiceUrl: https://pulsar.example.com:8443
    tenant: infinitic
    namespace: production
    client:
      ioThreads: 10
      listenerThreads: 5
      connectionsPerBroker: 2
      operationTimeoutSeconds: 30
      connectionTimeoutSeconds: 10
      keepAliveIntervalSeconds: 30
      memoryLimitMB: 512

Key Client Parameters

client.ioThreads
integer
Number of threads for handling connections to brokers.Default: Pulsar client default (1 thread per core)
client.listenerThreads
integer
Number of threads for message listeners.Default: Pulsar client default
client.connectionsPerBroker
integer
Number of connections established per broker.Default: 1
client.operationTimeoutSeconds
number
Timeout for producer/consumer operations in seconds.Default: 30
client.connectionTimeoutSeconds
number
Timeout for establishing broker connections in seconds.Default: 10
client.memoryLimitMB
integer
Maximum memory used by the client in megabytes.Default: 64 MB

Consumer Configuration

Configure message consumption behavior:
transport:
  pulsar:
    # ... other config
    consumer:
      receiverQueueSize: 1000
      ackTimeoutSeconds: 30
      negativeAckRedeliveryDelaySeconds: 60
      maxRedeliverCount: 3
      acknowledgmentGroupTimeSeconds: 0.1

Consumer Parameters

consumer.receiverQueueSize
integer
Size of the consumer receive queue. Higher values increase throughput but use more memory.Default: 1000
consumer.ackTimeoutSeconds
number
Timeout for message acknowledgment in seconds.Default: 0 (disabled)
consumer.negativeAckRedeliveryDelaySeconds
number
Delay before redelivering negatively acknowledged messages.Default: 60 seconds
consumer.maxRedeliverCount
integer
Maximum number of times a message will be redelivered before being sent to dead letter queue.Default: 3

Producer Configuration

Configure message production behavior:
transport:
  pulsar:
    # ... other config
    producer:
      sendTimeoutSeconds: 30
      blockIfQueueFull: true
      enableBatching: true
      batchingMaxMessages: 1000
      batchingMaxPublishDelaySeconds: 1
      compressionType: LZ4

Producer Parameters

producer.sendTimeoutSeconds
number
Timeout for send operations in seconds.Default: 30
producer.blockIfQueueFull
boolean
Whether to block send operations when the producer queue is full.Default: true
producer.enableBatching
boolean
Enable message batching for better throughput.Default: true
producer.compressionType
string
Compression algorithm for messages. Options: NONE, LZ4, ZLIB, ZSTD, SNAPPY.Default: NONE

Authentication

Token Authentication

Use JWT tokens for authentication:
transport:
  pulsar:
    # ... other config
    client:
      authentication:
        token: "eyJhbGciOiJIUzI1NiJ9..."

OAuth2 Authentication

For OAuth2 authentication (e.g., with Auth0, Okta):
transport:
  pulsar:
    # ... other config
    client:
      authentication:
        issuerUrl: https://auth.example.com
        privateKey: file:///path/to/credentials.json
        audience: urn:sn:pulsar:infinitic:production

TLS/SSL Configuration

Enable secure connections:
transport:
  pulsar:
    brokerServiceUrl: pulsar+ssl://pulsar.example.com:6651/
    webServiceUrl: https://pulsar.example.com:8443
    tenant: infinitic
    namespace: production
    client:
      tlsTrustCertsFilePath: /path/to/ca-cert.pem
      tlsKeyFilePath: /path/to/client-key.pem
      tlsCertificateFilePath: /path/to/client-cert.pem
      enableTlsHostnameVerification: true
      allowTlsInsecureConnection: false

TLS Parameters

client.tlsTrustCertsFilePath
string
Path to trusted CA certificates file.
client.tlsKeyFilePath
string
Path to client private key file.
client.tlsCertificateFilePath
string
Path to client certificate file.
client.enableTlsHostnameVerification
boolean
Enable hostname verification for TLS connections.Default: false
client.allowTlsInsecureConnection
boolean
Allow insecure TLS connections (useful for testing).Default: false

Pulsar Policies

Configure namespace policies for topics:
transport:
  pulsar:
    # ... other config
    policies:
      messageTTLSeconds: 86400
      retentionTimeMinutes: 10080
      retentionSizeMB: 1024
policies.messageTTLSeconds
integer
Time-to-live for messages in seconds (24 hours in example).
policies.retentionTimeMinutes
integer
Retention time for acknowledged messages in minutes (7 days in example).
policies.retentionSizeMB
integer
Maximum retention size in megabytes.

Production Configuration Example

Complete production-ready configuration:
transport:
  pulsar:
    brokerServiceUrl: pulsar+ssl://pulsar-prod.example.com:6651/
    webServiceUrl: https://pulsar-prod.example.com:8443
    tenant: infinitic
    namespace: production
    
    client:
      # Authentication
      authentication:
        issuerUrl: https://auth.example.com
        privateKey: file:///etc/infinitic/credentials.json
        audience: urn:sn:pulsar:infinitic:production
      
      # TLS
      tlsTrustCertsFilePath: /etc/ssl/certs/ca-bundle.crt
      enableTlsHostnameVerification: true
      allowTlsInsecureConnection: false
      
      # Performance
      ioThreads: 16
      listenerThreads: 8
      connectionsPerBroker: 3
      memoryLimitMB: 1024
      
      # Timeouts
      operationTimeoutSeconds: 30
      connectionTimeoutSeconds: 10
      keepAliveIntervalSeconds: 30
    
    consumer:
      receiverQueueSize: 1000
      ackTimeoutSeconds: 60
      negativeAckRedeliveryDelaySeconds: 30
      maxRedeliverCount: 3
      acknowledgmentGroupTimeSeconds: 0.1
    
    producer:
      sendTimeoutSeconds: 30
      blockIfQueueFull: true
      enableBatching: true
      batchingMaxMessages: 1000
      batchingMaxPublishDelaySeconds: 1
      compressionType: ZSTD
    
    policies:
      messageTTLSeconds: 86400
      retentionTimeMinutes: 10080
      retentionSizeMB: 2048

Configuration via Code

You can also configure Pulsar programmatically:
import io.infinitic.pulsar.config.PulsarConfig
import io.infinitic.pulsar.config.PulsarClientConfig
import io.infinitic.pulsar.config.auth.AuthenticationTokenConfig

val config = PulsarConfig(
    brokerServiceUrl = "pulsar://localhost:6650/",
    webServiceUrl = "http://localhost:8080",
    tenant = "infinitic",
    namespace = "production",
    client = PulsarClientConfig(
        ioThreads = 10,
        authentication = AuthenticationTokenConfig(
            token = Secret("your-token-here")
        )
    )
)

Troubleshooting

Connection Issues

If you’re experiencing connection problems:
  1. Verify broker URLs are correct and accessible
  2. Check firewall rules allow connections to Pulsar ports
  3. Verify authentication credentials are valid
  4. Check TLS certificates are not expired

Performance Tuning

For high-throughput scenarios:
  • Increase ioThreads and connectionsPerBroker
  • Enable producer batching with appropriate batch sizes
  • Use compression (LZ4 or ZSTD) for large messages
  • Increase memoryLimitMB if memory permits
  • Tune receiverQueueSize based on message size and rate

Message Redelivery

If messages are being redelivered excessively:
  • Increase ackTimeoutSeconds for long-running tasks
  • Adjust negativeAckRedeliveryDelaySeconds to give tasks more time
  • Check worker health and processing capacity
  • Review maxRedeliverCount setting

Best Practices

  1. Use SSL/TLS in production - Always encrypt traffic with pulsar+ssl://
  2. Enable authentication - Use token or OAuth2 authentication
  3. Monitor memory usage - Set appropriate memoryLimitMB limits
  4. Use separate namespaces - Isolate environments (dev, staging, production)
  5. Configure dead letter queues - Set maxRedeliverCount appropriately
  6. Enable compression - Use ZSTD for best compression ratio
  7. Tune for your workload - Adjust batch sizes and queue sizes based on message patterns

Next Steps

Build docs developers (and LLMs) love