The worker configuration defines how Infinitic workers process tasks and workflows, including transport settings, storage, services, and workflows.
Package
io.infinitic.workers.config.InfiniticWorkerConfig
Configuration Methods
From YAML File
val worker = InfiniticWorker.fromYamlFile("/path/to/worker.yml")
From YAML Resource
val worker = InfiniticWorker.fromYamlResource("worker.yml")
From Builder
val worker = InfiniticWorker.builder()
.setTransport(transportConfig)
.setStorage(storageConfig)
.addService(serviceConfig)
.addWorkflow(workflowConfig)
.build()
Configuration Properties
Optional name for the worker (used for logging and monitoring)
Logging configurationlogs:
beautify: true # Pretty-print CloudEvent logs
List of service configurations (task executors)
List of workflow configurations
Optional event listener for monitoring workflow and task events
Service Configuration
Service name (matches @Name annotation or class name)
Fully qualified class name of the service implementation
Executor configuration for running service tasksNumber of tasks to process concurrently
Fully qualified class name of WithTimeout implementation
Fully qualified class name of WithRetry implementation
Batch processing configurationMaximum messages per batch
Maximum seconds to wait for batch
Concurrency for retry handler (defaults to concurrency)
Concurrency for event handler (defaults to concurrency)
Tag engine configuration for serviceConcurrency for tag operations
Storage for tags (overrides default)
Workflow Configuration
Workflow name (matches @Name annotation or class name)
Fully qualified class name of the workflow implementation
Executor configuration for running workflow tasksParameters same as service executor (concurrency, withTimeout, withRetry, batch, etc.)
Tag engine configuration for workflowConcurrency for tag operations
Batch configuration for tag operations
workflows[].stateEngine
WorkflowStateEngineConfig
State engine configuration for workflow persistenceConcurrency for state operations
Storage for workflow state
Batch configuration for state operations
commandHandlerConcurrency
Concurrency for command handler (defaults to concurrency)
Concurrency for timer handler (defaults to concurrency)
Concurrency for event handler (defaults to concurrency)
Example YAML Configuration
name: payment-worker
transport:
pulsar:
brokerServiceUrl: pulsar://localhost:6650
webServiceUrl: http://localhost:8080
tenant: infinitic
namespace: dev
storage:
redis:
host: localhost
port: 6379
database: 0
compression:
mode: gzip
cache:
caffeine:
maximumSize: 10000
expireAfterAccess: 3600
logs:
beautify: true
services:
- name: PaymentService
class: com.example.payment.PaymentServiceImpl
executor:
concurrency: 20
withTimeout: com.example.payment.PaymentTimeout
withRetry: com.example.payment.PaymentRetryPolicy
batch:
maxMessages: 50
maxSeconds: 1.0
tagEngine:
concurrency: 5
- name: EmailService
class: com.example.notification.EmailServiceImpl
executor:
concurrency: 10
batch:
maxMessages: 100
maxSeconds: 0.5
workflows:
- name: BookingWorkflow
class: com.example.booking.BookingWorkflowImpl
executor:
concurrency: 10
withTimeout: com.example.booking.WorkflowTimeout
tagEngine:
concurrency: 5
batch:
maxMessages: 50
maxSeconds: 1.0
stateEngine:
concurrency: 10
batch:
maxMessages: 50
maxSeconds: 1.0
timerHandlerConcurrency: 5
eventListener:
class: com.example.monitoring.WorkflowEventListener
concurrency: 5
subscriptionName: event-listener-subscription
Complete Kotlin Configuration
import io.infinitic.workers.InfiniticWorker
import io.infinitic.workers.config.*
import io.infinitic.storage.config.RedisStorageConfig
import io.infinitic.storage.compression.CompressionConfig
import io.infinitic.cache.config.CaffeineCacheConfig
val storageConfig = RedisStorageConfig.builder()
.setHost("localhost")
.setPort(6379)
.setDatabase(0)
.setCompression(CompressionConfig.gzip())
.setCache(
CaffeineCacheConfig.builder()
.setMaximumSize(10000)
.setExpireAfterAccess(3600)
.build()
)
.build()
val serviceConfig = ServiceConfig(
name = "PaymentService",
factory = { PaymentServiceImpl() },
executor = ServiceExecutorConfig(
concurrency = 20,
withTimeout = PaymentTimeout::class.java,
withRetry = PaymentRetryPolicy::class.java,
batch = BatchConfig(
maxMessages = 50,
maxSeconds = 1.0
)
),
tagEngine = ServiceTagEngineConfig(
concurrency = 5,
storage = storageConfig
)
)
val workflowConfig = WorkflowConfig(
name = "BookingWorkflow",
factories = listOf({ BookingWorkflowImpl() }),
executor = WorkflowExecutorConfig(
concurrency = 10
),
tagEngine = WorkflowTagEngineConfig(
concurrency = 5,
storage = storageConfig
),
stateEngine = WorkflowStateEngineConfig(
concurrency = 10,
storage = storageConfig
)
)
val worker = InfiniticWorker(
InfiniticWorkerConfig(
name = "payment-worker",
transport = transportConfig,
storage = storageConfig,
services = listOf(serviceConfig),
workflows = listOf(workflowConfig)
)
)
worker.start()
Best Practices
- Set appropriate concurrency: Balance throughput and resource usage
- Configure storage: Use persistent storage (Redis, PostgreSQL) for production
- Enable caching: Improve performance with Caffeine cache
- Use compression: Reduce storage size with gzip compression
- Monitor batch sizes: Tune batch settings based on workload
- Separate workers: Use different workers for different services/workflows
- Configure retries: Set retry policies appropriate for your use case
- Set timeouts: Prevent tasks from running indefinitely
High Throughput
services:
- name: HighThroughputService
executor:
concurrency: 50
batch:
maxMessages: 200
maxSeconds: 0.5
Low Latency
services:
- name: LowLatencyService
executor:
concurrency: 20
batch:
maxMessages: 10
maxSeconds: 0.1
Resource Constrained
services:
- name: LightweightService
executor:
concurrency: 5
batch:
maxMessages: 20
maxSeconds: 2.0
See Also