Skip to main content
Infinitic uses a message transport layer to communicate between clients, workers, and services. Transport configuration is required for both workers and clients.

Available Transports

Infinitic supports two transport implementations:
  • Pulsar - Production-ready distributed messaging (recommended)
  • InMemory - Local development and testing

Pulsar Transport

Apache Pulsar is the recommended transport for production deployments.

YAML Configuration

Basic Configuration

transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

Complete Configuration

transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar.example.com:6650
    webServiceUrl: http://pulsar.example.com:8080
    tenant: infinitic
    namespace: production
    shutdownGracePeriodSeconds: 30.0
    
    allowedClusters:
      - cluster-1
      - cluster-2
    
    adminRoles:
      - admin
      - operator
    
    client:
      operationTimeout: 30
      connectionTimeout: 10
      requestTimeout: 60
      keepAliveInterval: 30
      authPluginClassName: org.apache.pulsar.client.impl.auth.AuthenticationToken
      authParams: token:xxxxx
    
    producer:
      compressionType: LZ4
      batchingMaxMessages: 1000
      batchingMaxPublishDelay: 10
      sendTimeout: 30
    
    consumer:
      receiverQueueSize: 1000
      acknowledgmentGroupTime: 100
      subscriptionType: Shared

Builder Pattern

import io.infinitic.transport.config.PulsarTransportConfig;
import io.infinitic.pulsar.config.PulsarClientConfig;
import io.infinitic.pulsar.config.PulsarProducerConfig;
import io.infinitic.pulsar.config.PulsarConsumerConfig;
import io.infinitic.pulsar.config.policies.PoliciesConfig;

PulsarTransportConfig transport = PulsarTransportConfig.builder()
    .setBrokerServiceUrl("pulsar://localhost:6650")
    .setWebServiceUrl("http://localhost:8080")
    .setTenant("infinitic")
    .setNamespace("dev")
    .setShutdownGracePeriodSeconds(30.0)
    .setAllowedClusters(Set.of("cluster-1", "cluster-2"))
    .setAdminRoles(Set.of("admin", "operator"))
    .setClientConfig(
        PulsarClientConfig.builder()
            .setOperationTimeout(30)
            .setConnectionTimeout(10)
            .build()
    )
    .setProducerConfig(
        PulsarProducerConfig.builder()
            .setCompressionType("LZ4")
            .setBatchingMaxMessages(1000)
            .build()
    )
    .setConsumerConfig(
        PulsarConsumerConfig.builder()
            .setReceiverQueueSize(1000)
            .setAcknowledgmentGroupTime(100)
            .build()
    )
    .build();

Configuration Options

Core Settings

OptionTypeRequiredDescription
brokerServiceUrlStringYesPulsar broker URL (e.g., pulsar://localhost:6650)
webServiceUrlStringYesPulsar web service URL (e.g., http://localhost:8080)
tenantStringYesPulsar tenant name
namespaceStringYesPulsar namespace name
shutdownGracePeriodSecondsDoubleNoGraceful shutdown duration (default: 30.0)
allowedClustersSet<String>NoAllowed Pulsar clusters
adminRolesSet<String>NoAdmin role names

Client Settings

OptionTypeDefaultDescription
operationTimeoutInteger30Operation timeout in seconds
connectionTimeoutInteger10Connection timeout in seconds
requestTimeoutInteger60Request timeout in seconds
keepAliveIntervalInteger30Keep-alive interval in seconds
authPluginClassNameStringnullAuthentication plugin class
authParamsStringnullAuthentication parameters

Producer Settings

OptionTypeDefaultDescription
compressionTypeStringNONECompression type (NONE, LZ4, ZLIB, ZSTD, SNAPPY)
batchingMaxMessagesInteger1000Maximum messages per batch
batchingMaxPublishDelayInteger10Maximum batch delay in milliseconds
sendTimeoutInteger30Send timeout in seconds

Consumer Settings

OptionTypeDefaultDescription
receiverQueueSizeInteger1000Receiver queue size
acknowledgmentGroupTimeInteger100Acknowledgment group time in milliseconds
subscriptionTypeStringSharedSubscription type (Shared, Exclusive, Failover, Key_Shared)

Authentication

Token Authentication

transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar.example.com:6650
    webServiceUrl: http://pulsar.example.com:8080
    tenant: infinitic
    namespace: production
    client:
      authPluginClassName: org.apache.pulsar.client.impl.auth.AuthenticationToken
      authParams: token:${PULSAR_TOKEN}

TLS Authentication

transport:
  pulsar:
    brokerServiceUrl: pulsar+ssl://pulsar.example.com:6651
    webServiceUrl: https://pulsar.example.com:8443
    tenant: infinitic
    namespace: production
    client:
      authPluginClassName: org.apache.pulsar.client.impl.auth.AuthenticationTls
      authParams: tlsCertFile:/path/to/cert.pem,tlsKeyFile:/path/to/key.pem

Connection Pooling

Pulsar clients maintain connection pools automatically. Configure pool behavior:
transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar.example.com:6650
    webServiceUrl: http://pulsar.example.com:8080
    tenant: infinitic
    namespace: production
    client:
      connectionTimeout: 10
      keepAliveInterval: 30
      operationTimeout: 30

Performance Tuning

High Throughput

transport:
  pulsar:
    # ... connection settings ...
    producer:
      compressionType: LZ4
      batchingMaxMessages: 10000
      batchingMaxPublishDelay: 100
      sendTimeout: 60
    consumer:
      receiverQueueSize: 5000
      acknowledgmentGroupTime: 100

Low Latency

transport:
  pulsar:
    # ... connection settings ...
    producer:
      batchingMaxMessages: 100
      batchingMaxPublishDelay: 1
      sendTimeout: 10
    consumer:
      receiverQueueSize: 100
      acknowledgmentGroupTime: 1

InMemory Transport

The InMemory transport is ideal for local development, testing, and single-process applications.

YAML Configuration

transport:
  inMemory:
    shutdownGracePeriodSeconds: 5.0

Programmatic Configuration

import io.infinitic.transport.config.InMemoryTransportConfig;

InMemoryTransportConfig transport = new InMemoryTransportConfig(
    5.0  // shutdownGracePeriodSeconds
);

Configuration Options

OptionTypeDefaultDescription
shutdownGracePeriodSecondsDouble5.0Graceful shutdown duration in seconds

Use Cases

Unit Testing

import io.infinitic.transport.config.InMemoryTransportConfig;
import io.infinitic.workers.InfiniticWorker;
import io.infinitic.clients.InfiniticClient;

@Test
void testWorkflow() {
    InMemoryTransportConfig transport = new InMemoryTransportConfig();
    
    // Create worker and client with same transport
    InfiniticWorker worker = InfiniticWorker.builder()
        .setTransport(transport)
        .build();
    
    InfiniticClient client = InfiniticClient.builder()
        .setTransport(transport)
        .build();
    
    worker.start();
    
    // Run tests
    // ...
    
    worker.close();
    client.close();
}

Local Development

# dev-config.yml
transport:
  inMemory:
    shutdownGracePeriodSeconds: 5.0

storage:
  inMemory: {}

services:
  - name: MyService
    executor:
      class: com.example.MyServiceImpl
      concurrency: 1

Shutdown Grace Period

The shutdown grace period determines how long the system waits to complete in-flight messages before shutting down.

Configuration

transport:
  pulsar:
    # ... other settings ...
    shutdownGracePeriodSeconds: 30.0

Behavior

  1. During shutdown, the transport:
    • Stops accepting new messages
    • Allows existing messages to complete
    • Waits up to shutdownGracePeriodSeconds
    • Forces shutdown if grace period expires
  2. Recommended values:
    • Development: 5-10 seconds
    • Production: 30-60 seconds
    • Long-running tasks: 60+ seconds

Best Practices

Production Deployments

  • Use Pulsar transport - InMemory is not suitable for production
  • Enable authentication - Always use authentication in production
  • Configure TLS - Encrypt connections with TLS
  • Set appropriate timeouts - Balance between reliability and latency
  • Monitor connections - Track connection health and pool usage

Development Environments

  • Use InMemory for tests - Simplifies test setup and teardown
  • Use Pulsar for integration - Test with production-like setup
  • Short grace periods - Faster feedback in development

High Availability

transport:
  pulsar:
    brokerServiceUrl: pulsar://broker-1:6650,broker-2:6650,broker-3:6650
    webServiceUrl: http://broker-1:8080,broker-2:8080,broker-3:8080
    tenant: infinitic
    namespace: production
    allowedClusters:
      - primary-cluster
      - backup-cluster
    client:
      connectionTimeout: 10
      operationTimeout: 30
      keepAliveInterval: 30

Troubleshooting

Connection Issues

transport:
  pulsar:
    # ... other settings ...
    client:
      connectionTimeout: 30  # Increase if connections are timing out
      operationTimeout: 60   # Increase for slow networks
      keepAliveInterval: 30  # Adjust for network stability

Performance Issues

transport:
  pulsar:
    # ... other settings ...
    producer:
      compressionType: LZ4           # Enable compression
      batchingMaxMessages: 10000     # Increase batch size
      batchingMaxPublishDelay: 100   # Increase batch delay
    consumer:
      receiverQueueSize: 5000        # Increase queue size

Authentication Errors

transport:
  pulsar:
    # ... other settings ...
    client:
      authPluginClassName: org.apache.pulsar.client.impl.auth.AuthenticationToken
      authParams: token:${PULSAR_TOKEN}  # Ensure token is valid

See Also

Build docs developers (and LLMs) love