Skip to main content
The InfiniticWorker class manages the execution of workflows and tasks in Infinitic. Workers process messages from the message broker, execute tasks and workflow logic, and manage workflow state.

Package

io.infinitic.workers.InfiniticWorker

Constructor

config
InfiniticWorkerConfigInterface
required
The worker configuration including transport, storage, and service/workflow definitions

Creation Methods

From Builder

val worker = InfiniticWorker.builder()
  .setTransport(transportConfig)
  .setStorage(storageConfig)
  .addService(serviceConfig)
  .addWorkflow(workflowConfig)
  .build()

From YAML

fromYamlResource
InfiniticWorker
Creates a worker from YAML files in the resources directoryParameters:
  • resources: String... - Resource paths
Example:
val worker = InfiniticWorker.fromYamlResource(
  "infinitic.yml"
)
fromYamlFile
InfiniticWorker
Creates a worker from YAML files in the file systemParameters:
  • files: String... - File paths
Example:
val worker = InfiniticWorker.fromYamlFile(
  "/etc/infinitic/worker.yml"
)
fromYamlString
InfiniticWorker
Creates a worker from YAML stringsParameters:
  • yamls: String... - YAML content strings

Instance Methods

Lifecycle Management

start
Unit
Starts the worker synchronously (blocks current thread)This method starts all configured consumers and begins processing messages. It blocks until the worker is closed or an error occurs.Example:
val worker = InfiniticWorker.fromYamlResource("infinitic.yml")
worker.start() // Blocks here
startAsync
CompletableFuture<Unit>
Starts the worker asynchronouslyReturns: A CompletableFuture that completes when the worker is fully startedExample:
val worker = InfiniticWorker.fromYamlResource("infinitic.yml")
val future = worker.startAsync()
// Worker is starting in background
// Do other initialization
future.join() // Wait for startup to complete
close
Unit
Stops the worker gracefullyCancels all message consumers and waits for ongoing messages to complete processing within the configured grace period. Automatically called on JVM shutdown.Example:
worker.use { worker ->
  worker.start()
} // Automatically closed

Properties

client
InfiniticClient
Built-in Infinitic client for dispatching workflows and tasksWorkers have access to a client instance that shares the same transport configuration, useful for starting child workflows or dispatching tasks.Example:
class MyServiceImpl : MyService {
  override fun process(data: String) {
    // Access worker's client if needed
    // Note: Typically use Workflow.newService() within workflows
  }
}
config
InfiniticWorkerConfigInterface
The worker configuration

Worker Architecture

The worker manages several types of components:

Service Executors

Execute task implementations:
  • Process ExecuteTask messages
  • Run user-defined service methods
  • Handle retries and timeouts
  • Support batch processing

Workflow Executors

Execute workflow logic:
  • Process workflow tasks
  • Run workflow methods
  • Manage workflow state transitions
  • Handle deterministic execution

Tag Engines

Manage workflow and task tags:
  • Map tags to IDs
  • Handle tag-based queries
  • Maintain tag indexes in storage

State Engines

Manage workflow state:
  • Store and retrieve workflow state
  • Process state transitions
  • Handle workflow commands
  • Manage timers and signals

Configuration

Workers are configured with:
  • Transport: Message broker configuration (Pulsar, In-Memory)
  • Storage: State storage configuration (Redis, PostgreSQL, MySQL, In-Memory)
  • Services: Task service definitions with concurrency and retry policies
  • Workflows: Workflow definitions with state engine and tag engine settings
  • Event Listeners: Optional event listeners for monitoring

Usage Example

import io.infinitic.workers.InfiniticWorker

fun main() {
  // Create worker from YAML configuration
  val worker = InfiniticWorker.fromYamlResource("worker.yml")
  
  // Register shutdown hook (automatically added by start())
  Runtime.getRuntime().addShutdownHook(Thread {
    println("Shutting down worker...")
    worker.close()
  })
  
  // Start worker (blocks)
  println("Starting worker...")
  worker.start()
}

Example with Async Start

import io.infinitic.workers.InfiniticWorker
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
  val worker = InfiniticWorker.fromYamlResource("worker.yml")
  
  try {
    // Start asynchronously
    val startFuture = worker.startAsync()
    println("Worker starting...")
    
    startFuture.join()
    println("Worker started successfully")
    
    // Keep running
    Thread.currentThread().join()
    
  } finally {
    worker.close()
  }
}

Graceful Shutdown

When close() is called, the worker:
  1. Stops accepting new messages
  2. Waits for ongoing messages to complete (up to shutdownGracePeriodSeconds)
  3. Logs statistics about in-flight messages
  4. Closes executors and transport connections
  5. Deletes temporary resources
worker.close() // Waits for graceful shutdown

Monitoring

The worker logs detailed information about:
  • Startup configuration (concurrency, storage, timeouts)
  • Message processing (received, in-flight, completed)
  • Errors and retries
  • Shutdown statistics
Example log output:
Service PaymentService:
* Service Executor      : (concurrency: 10, class: PaymentServiceImpl, timeout: 30.00s)
* Service Tag Engine    : (concurrency: 5, storage: redis, cache: caffeine)

Workflow BookingWorkflow:
* Workflow Executor     : (concurrency: 10, classes: BookingWorkflowImpl, timeout: 60.00s)
* Workflow Tag Engine   : (concurrency: 5, storage: redis, cache: caffeine)
* Workflow State Engine : (concurrency: 10, storage: redis, cache: caffeine)

Worker 'worker-1' ready (shutdownGracePeriodSeconds=30s)

Thread Safety

The InfiniticWorker manages internal thread pools for:
  • Message consumption (coroutines)
  • Task execution (executor services)
  • Workflow execution (executor services)
Concurrency is controlled via configuration settings.

Error Handling

The worker handles errors through:
  • Automatic retries: Based on retry policies
  • Dead letter queues: For messages that cannot be processed
  • Graceful degradation: Failed components don’t affect others
  • Logging: Comprehensive error logging with context

See Also

Build docs developers (and LLMs) love