Skip to main content
Workers are responsible for executing services and workflows. They process messages from the transport layer and persist state to storage.

Basic Structure

name: my-worker              # Optional worker name

transport:                   # Required: Message transport configuration
  pulsar: ...               # or inMemory

storage:                     # Required for workflows: State storage
  postgres: ...             # or redis, inMemory

logs:
  beautify: true            # Optional: Pretty-print logs (default: true)

services: []                # Service configurations
workflows: []               # Workflow configurations

eventListener:              # Optional: Event listener configuration
  class: com.example.EventListener
  concurrency: 1

Service Configuration

Services are the building blocks for tasks. Each service can have:
  • An executor to process service tasks
  • A tag engine to manage service tags

Service Executor

YAML Configuration

services:
  - name: MyService
    executor:
      class: com.example.MyServiceImpl
      concurrency: 10
      timeoutSeconds: 60.0
      retry:
        minimumSeconds: 1.0
        maximumSeconds: 1000.0
        backoffCoefficient: 2.0
        randomFactor: 0.5
        maximumRetries: 11
        ignoredExceptions:
          - java.lang.IllegalArgumentException
      batch:
        maxMessages: 1000
        maxSeconds: 1.0
      eventHandlerConcurrency: 10
      retryHandlerConcurrency: 10

Builder Pattern

import io.infinitic.workers.config.ServiceConfig;
import io.infinitic.workers.config.ServiceExecutorConfig;
import io.infinitic.common.workers.config.WithExponentialBackoffRetry;

ServiceExecutorConfig executor = ServiceExecutorConfig.builder()
    .setServiceName("MyService")
    .setFactory(() -> new MyServiceImpl())
    .setConcurrency(10)
    .setTimeoutSeconds(60.0)
    .withRetry(
        WithExponentialBackoffRetry.builder()
            .setMinimumSeconds(1.0)
            .setMaximumSeconds(1000.0)
            .setBackoffCoefficient(2.0)
            .setRandomFactor(0.5)
            .setMaximumRetries(11)
            .addIgnoredException(IllegalArgumentException.class)
            .build()
    )
    .setBatch(1000, 1.0)
    .setEventHandlerConcurrency(10)
    .setRetryHandlerConcurrency(10)
    .build();

ServiceConfig service = new ServiceConfig(
    "MyService",
    executor,
    null  // tagEngine
);

Configuration Options

OptionTypeDefaultDescription
classStringRequiredFully qualified class name of service implementation
concurrencyInteger1Number of concurrent task executions
timeoutSecondsDoublenullTask execution timeout in seconds
retryObjectnullRetry policy configuration
batchObjectnullBatch processing configuration
eventHandlerConcurrencyIntegerSame as concurrencyConcurrent event handlers
retryHandlerConcurrencyIntegerSame as concurrencyConcurrent retry handlers

Service Tag Engine

YAML Configuration

services:
  - name: MyService
    tagEngine:
      concurrency: 5
      storage:  # Optional: Override default storage
        postgres:
          host: localhost
          port: 5432
          username: postgres

Builder Pattern

import io.infinitic.workers.config.ServiceTagEngineConfig;

ServiceTagEngineConfig tagEngine = ServiceTagEngineConfig.builder()
    .setServiceName("MyService")
    .setConcurrency(5)
    .setStorage(storage)  // Optional: override default
    .build();

ServiceConfig service = new ServiceConfig(
    "MyService",
    null,      // executor
    tagEngine
);

Configuration Options

OptionTypeDefaultDescription
concurrencyInteger1Number of concurrent tag operations
storageObjectWorker defaultStorage configuration for tags

Workflow Configuration

Workflows orchestrate services. Each workflow can have:
  • An executor to process workflow tasks
  • A state engine to manage workflow state
  • A tag engine to manage workflow tags

Workflow Executor

YAML Configuration

workflows:
  - name: MyWorkflow
    executor:
      class: com.example.MyWorkflowImpl
      # Or multiple versions:
      # classes:
      #   - com.example.MyWorkflowV1
      #   - com.example.MyWorkflowV2
      concurrency: 5
      timeoutSeconds: 3600.0
      retry:
        minimumSeconds: 1.0
        maximumSeconds: 1000.0
        backoffCoefficient: 2.0
        maximumRetries: 11
      checkMode: simple  # or strict
      batch:
        maxMessages: 1000
        maxSeconds: 1.0
      eventHandlerConcurrency: 5
      retryHandlerConcurrency: 5

Builder Pattern

import io.infinitic.workers.config.WorkflowConfig;
import io.infinitic.workers.config.WorkflowExecutorConfig;
import io.infinitic.workflows.WorkflowCheckMode;

WorkflowExecutorConfig executor = WorkflowExecutorConfig.builder()
    .setWorkflowName("MyWorkflow")
    .addFactory(() -> new MyWorkflowImpl())
    .setConcurrency(5)
    .setTimeoutSeconds(3600.0)
    .withRetry(
        WithExponentialBackoffRetry.builder()
            .setMinimumSeconds(1.0)
            .setMaximumSeconds(1000.0)
            .setBackoffCoefficient(2.0)
            .setMaximumRetries(11)
            .build()
    )
    .setCheckMode(WorkflowCheckMode.simple)
    .setBatch(1000, 1.0)
    .setEventHandlerConcurrency(5)
    .setRetryHandlerConcurrency(5)
    .build();

WorkflowConfig workflow = new WorkflowConfig(
    "MyWorkflow",
    executor,
    null,  // tagEngine
    null   // stateEngine
);

Configuration Options

OptionTypeDefaultDescription
classStringnullSingle workflow implementation class
classesList<String>nullMultiple workflow versions
concurrencyInteger1Number of concurrent workflow executions
timeoutSecondsDoublenullWorkflow task execution timeout
retryObjectnullRetry policy configuration
checkModeStringnullWorkflow validation mode (simple, strict)
batchObjectnullBatch processing configuration
eventHandlerConcurrencyIntegerSame as concurrencyConcurrent event handlers
retryHandlerConcurrencyIntegerSame as concurrencyConcurrent retry handlers

Workflow State Engine

YAML Configuration

workflows:
  - name: MyWorkflow
    stateEngine:
      concurrency: 5
      storage:  # Optional: Override default storage
        postgres:
          host: localhost
          port: 5432
          username: postgres
      batch:
        maxMessages: 1000
        maxSeconds: 1.0
      timerHandlerConcurrency: 5
      commandHandlerConcurrency: 5
      eventHandlerConcurrency: 5
      timerHandlerPastDueSeconds: 259200  # 3 days

Builder Pattern

import io.infinitic.workers.config.WorkflowStateEngineConfig;

WorkflowStateEngineConfig stateEngine = WorkflowStateEngineConfig.builder()
    .setWorkflowName("MyWorkflow")
    .setConcurrency(5)
    .setStorage(storage)  // Optional: override default
    .setBatch(1000, 1.0)
    .setTimerHandlerConcurrency(5)
    .setCommandHandlerConcurrency(5)
    .setEventHandlerConcurrency(5)
    .setTimerHandlerPastDueSeconds(259200L)
    .build();

WorkflowConfig workflow = new WorkflowConfig(
    "MyWorkflow",
    null,        // executor
    null,        // tagEngine
    stateEngine
);

Configuration Options

OptionTypeDefaultDescription
concurrencyInteger1Number of concurrent state operations
storageObjectWorker defaultStorage configuration for state
batchObjectnullBatch processing configuration
timerHandlerConcurrencyIntegerSame as concurrencyConcurrent timer handlers
commandHandlerConcurrencyIntegerSame as concurrencyConcurrent command handlers
eventHandlerConcurrencyIntegerSame as concurrencyConcurrent event handlers
timerHandlerPastDueSecondsLong259200 (3 days)Max past-due time before timer expires

Workflow Tag Engine

YAML Configuration

workflows:
  - name: MyWorkflow
    tagEngine:
      concurrency: 5
      storage:  # Optional: Override default storage
        postgres:
          host: localhost
          port: 5432
          username: postgres
      batch:
        maxMessages: 1000
        maxSeconds: 1.0

Builder Pattern

import io.infinitic.workers.config.WorkflowTagEngineConfig;

WorkflowTagEngineConfig tagEngine = WorkflowTagEngineConfig.builder()
    .setWorkflowName("MyWorkflow")
    .setConcurrency(5)
    .setStorage(storage)  // Optional: override default
    .setBatch(1000, 1.0)
    .build();

WorkflowConfig workflow = new WorkflowConfig(
    "MyWorkflow",
    null,      // executor
    tagEngine,
    null       // stateEngine
);

Configuration Options

OptionTypeDefaultDescription
concurrencyInteger1Number of concurrent tag operations
storageObjectWorker defaultStorage configuration for tags
batchObjectnullBatch processing configuration

Retry Policy

Retry policies control how failed tasks are retried using exponential backoff.

Configuration Options

OptionTypeDefaultDescription
minimumSecondsDouble1.0Initial retry delay
maximumSecondsDouble1000.0Maximum retry delay
backoffCoefficientDouble2.0Exponential backoff multiplier
randomFactorDouble0.5Jitter factor (0.0-1.0)
maximumRetriesInteger11Maximum number of retries
ignoredExceptionsList<String>[]Exceptions that should not trigger retries

Batch Configuration

Batch processing improves throughput by processing multiple messages together.

Configuration Options

OptionTypeDefaultDescription
maxMessagesInteger1000Maximum messages per batch
maxSecondsDouble1.0Maximum time to wait for batch

Complete Example

name: production-worker

transport:
  pulsar:
    brokerServiceUrl: pulsar://pulsar.example.com:6650
    webServiceUrl: http://pulsar.example.com:8080
    tenant: infinitic
    namespace: production
    shutdownGracePeriodSeconds: 30.0

storage:
  postgres:
    host: postgres.example.com
    port: 5432
    username: infinitic
    password: ${DB_PASSWORD}
    database: infinitic
    schema: public
    maximumPoolSize: 10

logs:
  beautify: true

services:
  - name: EmailService
    executor:
      class: com.example.EmailServiceImpl
      concurrency: 20
      timeoutSeconds: 30.0
      retry:
        minimumSeconds: 1.0
        maximumSeconds: 600.0
        backoffCoefficient: 2.0
        maximumRetries: 5
        ignoredExceptions:
          - com.example.InvalidEmailException
    tagEngine:
      concurrency: 10

  - name: PaymentService
    executor:
      class: com.example.PaymentServiceImpl
      concurrency: 10
      timeoutSeconds: 60.0

workflows:
  - name: OrderWorkflow
    executor:
      class: com.example.OrderWorkflowImpl
      concurrency: 10
      checkMode: simple
    stateEngine:
      concurrency: 10
      batch:
        maxMessages: 500
        maxSeconds: 0.5
      timerHandlerConcurrency: 5
    tagEngine:
      concurrency: 10

See Also

Build docs developers (and LLMs) love