Skip to main content
The transport configuration defines how Infinitic communicates between clients, workers, and workflow engines using a message broker.

Package

io.infinitic.transport.config.TransportConfig

Supported Transports

Infinitic currently supports two transport implementations:
  1. Pulsar - Apache Pulsar message broker (recommended for production)
  2. InMemory - In-memory transport (for testing and development)

Common Properties

shutdownGracePeriodSeconds
Double
default:"30.0"
Duration in seconds to wait for ongoing messages to complete during shutdown
shutdownGracePeriodSeconds: 30.0
cloudEventSourcePrefix
String
default:""
Prefix for CloudEvent sources (used in event logging)
cloudEventSourcePrefix: "infinitic.prod"

Pulsar Transport

Configuration Properties

pulsar.brokerServiceUrl
String
required
Pulsar broker service URL
brokerServiceUrl: pulsar://localhost:6650
pulsar.webServiceUrl
String
required
Pulsar web service URL (for admin operations)
webServiceUrl: http://localhost:8080
pulsar.tenant
String
required
Pulsar tenant name
tenant: infinitic
pulsar.namespace
String
required
Pulsar namespace name
namespace: prod
pulsar.allowedClusters
Set<String>
Set of allowed Pulsar clusters (for geo-replication)
allowedClusters:
  - pulsar-cluster-1
  - pulsar-cluster-2
pulsar.policies
PoliciesConfig
Pulsar policies configuration
retentionPolicyInMinutes
Int
Message retention time in minutes
retentionStorageInMB
Int
Message retention storage size in MB
maxProducersPerTopic
Int
Maximum producers per topic
maxConsumersPerTopic
Int
Maximum consumers per topic
maxConsumersPerSubscription
Int
Maximum consumers per subscription
pulsar.authentication
AuthenticationConfig
Pulsar authentication configuration
token
String
JWT token for authentication
oauth2
OAuth2Config
OAuth2 authentication configuration

Example Pulsar YAML Configuration

transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar-broker:6650
    webServiceUrl: http://pulsar-broker:8080
    tenant: infinitic
    namespace: production
    allowedClusters:
      - us-east-1
      - us-west-2
    shutdownGracePeriodSeconds: 30.0
    cloudEventSourcePrefix: infinitic.prod
    policies:
      retentionPolicyInMinutes: 1440  # 1 day
      retentionStorageInMB: 1024      # 1 GB
      maxProducersPerTopic: 100
      maxConsumersPerTopic: 100
      maxConsumersPerSubscription: 50

Kotlin Pulsar Configuration

import io.infinitic.transport.pulsar.config.PulsarTransportConfig
import io.infinitic.transport.pulsar.config.PulsarPoliciesConfig

val transportConfig = PulsarTransportConfig.builder()
  .setBrokerServiceUrl("pulsar://localhost:6650")
  .setWebServiceUrl("http://localhost:8080")
  .setTenant("infinitic")
  .setNamespace("prod")
  .setAllowedClusters(setOf("cluster-1", "cluster-2"))
  .setShutdownGracePeriodSeconds(30.0)
  .setPolicies(
    PulsarPoliciesConfig(
      retentionPolicyInMinutes = 1440,
      retentionStorageInMB = 1024,
      maxProducersPerTopic = 100,
      maxConsumersPerTopic = 100
    )
  )
  .build()

InMemory Transport

The in-memory transport is useful for testing and local development. It does not persist messages and all communication happens within the same JVM.

Example InMemory YAML Configuration

transport:
  inMemory:
    shutdownGracePeriodSeconds: 5.0
    cloudEventSourcePrefix: infinitic.test

Kotlin InMemory Configuration

import io.infinitic.transport.inMemory.InMemoryTransportConfig

val transportConfig = InMemoryTransportConfig(
  shutdownGracePeriodSeconds = 5.0,
  cloudEventSourcePrefix = "infinitic.test"
)

Authentication

JWT Token Authentication

transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar:6650
    webServiceUrl: http://pulsar:8080
    tenant: infinitic
    namespace: prod
    authentication:
      token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

OAuth2 Authentication

transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar:6650
    webServiceUrl: http://pulsar:8080
    tenant: infinitic
    namespace: prod
    authentication:
      oauth2:
        issuerUrl: https://auth.example.com
        privateKey: /path/to/private-key.pem
        audience: infinitic-cluster

SSL/TLS Configuration

transport:
  pulsar:
    brokerServiceUrl: pulsar+ssl://pulsar:6651
    webServiceUrl: https://pulsar:8443
    tenant: infinitic
    namespace: prod
    useTls: true
    tlsTrustCertsFilePath: /path/to/ca-cert.pem
    tlsAllowInsecureConnection: false
    tlsEnableHostnameVerification: true

Topics Structure

Infinitic creates the following topic structure:
persistent://{tenant}/{namespace}/infinitic-service-executor-{serviceName}
persistent://{tenant}/{namespace}/infinitic-service-executor-retry-{serviceName}
persistent://{tenant}/{namespace}/infinitic-service-executor-event-{serviceName}
persistent://{tenant}/{namespace}/infinitic-service-tag-{serviceName}
persistent://{tenant}/{namespace}/infinitic-workflow-executor-{workflowName}
persistent://{tenant}/{namespace}/infinitic-workflow-executor-retry-{workflowName}
persistent://{tenant}/{namespace}/infinitic-workflow-executor-event-{workflowName}
persistent://{tenant}/{namespace}/infinitic-workflow-state-engine-{workflowName}
persistent://{tenant}/{namespace}/infinitic-workflow-state-cmd-{workflowName}
persistent://{tenant}/{namespace}/infinitic-workflow-state-timer-{workflowName}
persistent://{tenant}/{namespace}/infinitic-workflow-state-event-{workflowName}
persistent://{tenant}/{namespace}/infinitic-workflow-tag-{workflowName}
persistent://{tenant}/{namespace}/infinitic-client-{clientName}

Best Practices

Production Configuration

transport:
  pulsar:
    brokerServiceUrl: pulsar+ssl://pulsar:6651
    webServiceUrl: https://pulsar:8443
    tenant: infinitic
    namespace: prod
    allowedClusters:
      - prod-cluster-1
      - prod-cluster-2
      - prod-cluster-3
    shutdownGracePeriodSeconds: 30.0
    useTls: true
    tlsAllowInsecureConnection: false
    authentication:
      token: ${PULSAR_TOKEN}
    policies:
      retentionPolicyInMinutes: 2880  # 2 days
      retentionStorageInMB: 2048      # 2 GB
      maxProducersPerTopic: 100
      maxConsumersPerTopic: 100

Development Configuration

transport:
  inMemory:
    shutdownGracePeriodSeconds: 5.0

Testing Configuration

transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: test
    shutdownGracePeriodSeconds: 5.0
    policies:
      retentionPolicyInMinutes: 60  # 1 hour
      retentionStorageInMB: 100

Monitoring

Pulsar Metrics

Monitor these Pulsar metrics:
  • Topic throughput (messages/sec)
  • Consumer lag
  • Message backlog
  • Producer/consumer counts
  • Replication lag (if using geo-replication)

Health Checks

import io.infinitic.clients.InfiniticClient

fun checkTransportHealth(client: InfiniticClient): Boolean {
  return try {
    // Attempt to create a workflow stub
    client.newWorkflow(HealthCheckWorkflow::class.java)
    true
  } catch (e: Exception) {
    logger.error("Transport health check failed", e)
    false
  }
}

Troubleshooting

Connection Issues

# Enable connection logging
logging:
  level:
    io.infinitic.transport: DEBUG
    org.apache.pulsar: DEBUG

Message Backlog

If messages are backing up:
  1. Increase worker concurrency
  2. Add more worker instances
  3. Check for slow tasks
  4. Review retention policies

Topic Deletion

To clean up topics:
# Delete all Infinitic topics for a namespace
pulsar-admin topics list persistent://infinitic/prod \
  | grep infinitic- \
  | xargs -I {} pulsar-admin topics delete {}

Environment Variables

Use environment variables for sensitive configuration:
transport:
  pulsar:
    brokerServiceUrl: ${PULSAR_BROKER_URL}
    webServiceUrl: ${PULSAR_WEB_URL}
    tenant: ${PULSAR_TENANT}
    namespace: ${PULSAR_NAMESPACE}
    authentication:
      token: ${PULSAR_TOKEN}

Multi-Cluster Configuration

For geo-distributed deployments:
transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar-us-east:6650
    webServiceUrl: http://pulsar-us-east:8080
    tenant: infinitic
    namespace: global
    allowedClusters:
      - us-east-1
      - eu-west-1
      - ap-southeast-1
    policies:
      replicationClusters:
        - us-east-1
        - eu-west-1
        - ap-southeast-1

See Also

Build docs developers (and LLMs) love