Skip to main content
The worker configuration defines how Infinitic workers process tasks and workflows, including transport settings, storage, services, and workflows.

Package

io.infinitic.workers.config.InfiniticWorkerConfig

Configuration Methods

From YAML File

val worker = InfiniticWorker.fromYamlFile("/path/to/worker.yml")

From YAML Resource

val worker = InfiniticWorker.fromYamlResource("worker.yml")

From Builder

val worker = InfiniticWorker.builder()
  .setTransport(transportConfig)
  .setStorage(storageConfig)
  .addService(serviceConfig)
  .addWorkflow(workflowConfig)
  .build()

Configuration Properties

name
String
Optional name for the worker (used for logging and monitoring)
name: worker-1
transport
TransportConfig
required
Transport configuration (message broker settings)See Transport Configuration
storage
StorageConfig
Default storage configuration for state persistenceSee Storage Configuration
logs
LogsConfig
Logging configuration
logs:
  beautify: true  # Pretty-print CloudEvent logs
services
List<ServiceConfig>
List of service configurations (task executors)
workflows
List<WorkflowConfig>
List of workflow configurations
eventListener
EventListenerConfig
Optional event listener for monitoring workflow and task events

Service Configuration

services[].name
String
required
Service name (matches @Name annotation or class name)
services[].class
String
required
Fully qualified class name of the service implementation
services[].executor
ServiceExecutorConfig
Executor configuration for running service tasks
concurrency
Int
default:"10"
Number of tasks to process concurrently
withTimeout
String
Fully qualified class name of WithTimeout implementation
withRetry
String
Fully qualified class name of WithRetry implementation
batch
BatchConfig
Batch processing configuration
maxMessages
Int
Maximum messages per batch
maxSeconds
Double
Maximum seconds to wait for batch
retryHandlerConcurrency
Int
Concurrency for retry handler (defaults to concurrency)
eventHandlerConcurrency
Int
Concurrency for event handler (defaults to concurrency)
services[].tagEngine
ServiceTagEngineConfig
Tag engine configuration for service
concurrency
Int
default:"5"
Concurrency for tag operations
storage
StorageConfig
Storage for tags (overrides default)

Workflow Configuration

workflows[].name
String
required
Workflow name (matches @Name annotation or class name)
workflows[].class
String
required
Fully qualified class name of the workflow implementation
workflows[].executor
WorkflowExecutorConfig
Executor configuration for running workflow tasksParameters same as service executor (concurrency, withTimeout, withRetry, batch, etc.)
workflows[].tagEngine
WorkflowTagEngineConfig
Tag engine configuration for workflow
concurrency
Int
default:"5"
Concurrency for tag operations
storage
StorageConfig
Storage for tags
batch
BatchConfig
Batch configuration for tag operations
workflows[].stateEngine
WorkflowStateEngineConfig
State engine configuration for workflow persistence
concurrency
Int
default:"10"
Concurrency for state operations
storage
StorageConfig
required
Storage for workflow state
batch
BatchConfig
Batch configuration for state operations
commandHandlerConcurrency
Int
Concurrency for command handler (defaults to concurrency)
timerHandlerConcurrency
Int
Concurrency for timer handler (defaults to concurrency)
eventHandlerConcurrency
Int
Concurrency for event handler (defaults to concurrency)

Example YAML Configuration

name: payment-worker

transport:
  pulsar:
    brokerServiceUrl: pulsar://localhost:6650
    webServiceUrl: http://localhost:8080
    tenant: infinitic
    namespace: dev

storage:
  redis:
    host: localhost
    port: 6379
    database: 0
  compression:
    mode: gzip
  cache:
    caffeine:
      maximumSize: 10000
      expireAfterAccess: 3600

logs:
  beautify: true

services:
  - name: PaymentService
    class: com.example.payment.PaymentServiceImpl
    executor:
      concurrency: 20
      withTimeout: com.example.payment.PaymentTimeout
      withRetry: com.example.payment.PaymentRetryPolicy
      batch:
        maxMessages: 50
        maxSeconds: 1.0
    tagEngine:
      concurrency: 5

  - name: EmailService
    class: com.example.notification.EmailServiceImpl
    executor:
      concurrency: 10
      batch:
        maxMessages: 100
        maxSeconds: 0.5

workflows:
  - name: BookingWorkflow
    class: com.example.booking.BookingWorkflowImpl
    executor:
      concurrency: 10
      withTimeout: com.example.booking.WorkflowTimeout
    tagEngine:
      concurrency: 5
      batch:
        maxMessages: 50
        maxSeconds: 1.0
    stateEngine:
      concurrency: 10
      batch:
        maxMessages: 50
        maxSeconds: 1.0
      timerHandlerConcurrency: 5

eventListener:
  class: com.example.monitoring.WorkflowEventListener
  concurrency: 5
  subscriptionName: event-listener-subscription

Complete Kotlin Configuration

import io.infinitic.workers.InfiniticWorker
import io.infinitic.workers.config.*
import io.infinitic.storage.config.RedisStorageConfig
import io.infinitic.storage.compression.CompressionConfig
import io.infinitic.cache.config.CaffeineCacheConfig

val storageConfig = RedisStorageConfig.builder()
  .setHost("localhost")
  .setPort(6379)
  .setDatabase(0)
  .setCompression(CompressionConfig.gzip())
  .setCache(
    CaffeineCacheConfig.builder()
      .setMaximumSize(10000)
      .setExpireAfterAccess(3600)
      .build()
  )
  .build()

val serviceConfig = ServiceConfig(
  name = "PaymentService",
  factory = { PaymentServiceImpl() },
  executor = ServiceExecutorConfig(
    concurrency = 20,
    withTimeout = PaymentTimeout::class.java,
    withRetry = PaymentRetryPolicy::class.java,
    batch = BatchConfig(
      maxMessages = 50,
      maxSeconds = 1.0
    )
  ),
  tagEngine = ServiceTagEngineConfig(
    concurrency = 5,
    storage = storageConfig
  )
)

val workflowConfig = WorkflowConfig(
  name = "BookingWorkflow",
  factories = listOf({ BookingWorkflowImpl() }),
  executor = WorkflowExecutorConfig(
    concurrency = 10
  ),
  tagEngine = WorkflowTagEngineConfig(
    concurrency = 5,
    storage = storageConfig
  ),
  stateEngine = WorkflowStateEngineConfig(
    concurrency = 10,
    storage = storageConfig
  )
)

val worker = InfiniticWorker(
  InfiniticWorkerConfig(
    name = "payment-worker",
    transport = transportConfig,
    storage = storageConfig,
    services = listOf(serviceConfig),
    workflows = listOf(workflowConfig)
  )
)

worker.start()

Best Practices

  1. Set appropriate concurrency: Balance throughput and resource usage
  2. Configure storage: Use persistent storage (Redis, PostgreSQL) for production
  3. Enable caching: Improve performance with Caffeine cache
  4. Use compression: Reduce storage size with gzip compression
  5. Monitor batch sizes: Tune batch settings based on workload
  6. Separate workers: Use different workers for different services/workflows
  7. Configure retries: Set retry policies appropriate for your use case
  8. Set timeouts: Prevent tasks from running indefinitely

Performance Tuning

High Throughput

services:
  - name: HighThroughputService
    executor:
      concurrency: 50
      batch:
        maxMessages: 200
        maxSeconds: 0.5

Low Latency

services:
  - name: LowLatencyService
    executor:
      concurrency: 20
      batch:
        maxMessages: 10
        maxSeconds: 0.1

Resource Constrained

services:
  - name: LightweightService
    executor:
      concurrency: 5
      batch:
        maxMessages: 20
        maxSeconds: 2.0

See Also

Build docs developers (and LLMs) love