Skip to main content
Infinitic emits CloudEvents for all workflow and task lifecycle events. This enables monitoring, auditing, integration with external systems, and building event-driven architectures.

What are CloudEvents?

CloudEvents is a specification for describing event data in a common format. Infinitic converts internal messages to CloudEvents, making it easy to:
  • Monitor workflow and task execution
  • Build real-time dashboards
  • Integrate with event streaming platforms
  • Implement custom alerting and notifications
  • Create audit logs

Event Structure

Every CloudEvent from Infinitic follows this structure:
{
  "specversion": "1.0",
  "type": "infinitic.workflow.completed",
  "source": "infinitic/workflows/stateEngine/MyWorkflow",
  "subject": "550e8400-e29b-41d4-a716-446655440000",
  "id": "unique-message-id",
  "time": "2024-03-06T12:00:00Z",
  "datacontenttype": "application/json",
  "data": {
    // Event-specific data
  }
}

Event Fields

  • type - Event type (e.g., infinitic.workflow.completed)
  • source - Origin of the event (workflow/service, executor/state engine, name)
  • subject - Workflow ID or Task ID
  • id - Unique message ID
  • time - When the event was published
  • data - Event payload with details

Event Types

Infinitic emits CloudEvents for workflows and tasks:

Workflow Events

  • infinitic.workflow.dispatch - Workflow dispatched
  • infinitic.workflow.started - Workflow execution started
  • infinitic.workflow.completed - Workflow completed successfully
  • infinitic.workflow.failed - Workflow failed
  • infinitic.workflow.canceled - Workflow was canceled
  • infinitic.workflow.dispatchMethod - Workflow method dispatched
  • infinitic.workflow.methodCompleted - Method completed
  • infinitic.workflow.methodFailed - Method failed
  • infinitic.workflow.methodCanceled - Method canceled
  • infinitic.workflow.methodTimedOut - Method timed out
  • infinitic.workflow.taskDispatched - Task dispatched from workflow
  • infinitic.workflow.taskCompleted - Task completed
  • infinitic.workflow.taskFailed - Task failed
  • infinitic.workflow.taskCanceled - Task canceled
  • infinitic.workflow.taskTimedOut - Task timed out
  • infinitic.workflow.taskUnknown - Task not found
  • infinitic.workflow.remoteMethodDispatched - Child workflow dispatched
  • infinitic.workflow.remoteMethodCompleted - Child workflow completed
  • infinitic.workflow.remoteMethodFailed - Child workflow failed
  • infinitic.workflow.remoteMethodCanceled - Child workflow canceled
  • infinitic.workflow.remoteMethodTimedOut - Child workflow timed out
  • infinitic.workflow.remoteMethodUnknown - Child workflow not found
  • infinitic.workflow.signal - Signal sent to workflow
  • infinitic.workflow.signalReceived - Signal received by workflow
  • infinitic.workflow.signalDiscarded - Signal discarded (workflow not ready)
  • infinitic.workflow.signalDispatched - Signal dispatched to child workflow
  • infinitic.workflow.timerDispatched - Timer created
  • infinitic.workflow.timerCompleted - Timer completed
  • infinitic.workflow.executorDispatched - Workflow executor started
  • infinitic.workflow.executorCompleted - Workflow executor completed
  • infinitic.workflow.executorFailed - Workflow executor failed
  • infinitic.workflow.retryExecutor - Retrying workflow executor

Task Events

  • infinitic.task.dispatch - Task dispatched to service
  • infinitic.task.started - Task execution started
  • infinitic.task.completed - Task completed successfully
  • infinitic.task.failed - Task failed
  • infinitic.task.canceled - Task canceled
  • infinitic.task.retryScheduled - Task retry scheduled
  • infinitic.task.retryTask - Retrying task

Creating an Event Listener

Implement the CloudEventListener interface to receive events:
import io.infinitic.cloudEvents.CloudEventListener
import io.cloudevents.CloudEvent

class MyEventListener : CloudEventListener {
    override suspend fun onEvent(event: CloudEvent) {
        when (event.type) {
            "infinitic.workflow.completed" -> handleWorkflowCompleted(event)
            "infinitic.workflow.failed" -> handleWorkflowFailed(event)
            "infinitic.task.failed" -> handleTaskFailed(event)
            else -> logger.debug("Received event: ${event.type}")
        }
    }
    
    private fun handleWorkflowCompleted(event: CloudEvent) {
        val workflowId = event.subject
        logger.info("Workflow $workflowId completed")
        // Send notification, update dashboard, etc.
    }
    
    private fun handleWorkflowFailed(event: CloudEvent) {
        val workflowId = event.subject
        val data = event.data?.toBytes()?.let { String(it) }
        logger.error("Workflow $workflowId failed: $data")
        // Send alert, create incident ticket, etc.
    }
    
    private fun handleTaskFailed(event: CloudEvent) {
        val taskId = event.subject
        logger.warn("Task $taskId failed, will retry if policy allows")
    }
}

Configuring Event Listener

Configure the event listener in your Infinitic setup:

Using Builder

import io.infinitic.events.config.EventListenerConfig

val eventListenerConfig = EventListenerConfig.builder()
    .setListener(MyEventListener())
    .setConcurrency(5)
    .setSubscriptionName("my-event-listener")
    .allowWorkflows("OrderWorkflow", "PaymentWorkflow")
    .allowServices("EmailService", "NotificationService")
    .setBatch(maxEvents = 100, maxSeconds = 5.0)
    .build()

Using YAML

eventListener:
  class: com.example.MyEventListener
  concurrency: 5
  subscriptionName: my-event-listener
  workflows:
    allow:
      - OrderWorkflow
      - PaymentWorkflow
  services:
    allow:
      - EmailService
      - NotificationService
  batch:
    maxEvents: 100
    maxSeconds: 5.0
val config = EventListenerConfig.fromYamlFile("infinitic.yml")

Event Filtering

Filter by Workflow

val config = EventListenerConfig.builder()
    .setListener(MyEventListener())
    .allowWorkflows("OrderWorkflow", "ShippingWorkflow")
    .disallowWorkflows("InternalWorkflow")
    .build()

Filter by Service

val config = EventListenerConfig.builder()
    .setListener(MyEventListener())
    .allowServices("PaymentService", "EmailService")
    .build()

Filter by Event Type

Implement filtering in your listener:
class FilteredEventListener : CloudEventListener {
    private val interestedEvents = setOf(
        "infinitic.workflow.completed",
        "infinitic.workflow.failed",
        "infinitic.task.failed"
    )
    
    override suspend fun onEvent(event: CloudEvent) {
        if (event.type in interestedEvents) {
            processEvent(event)
        }
    }
}

Event Data

The data field contains event-specific information:

Workflow Completed Event

{
  "workflowName": "OrderWorkflow",
  "workflowVersion": "1.0",
  "workerName": "worker-01",
  "infiniticVersion": "0.16.0"
}

Workflow Dispatch Event

{
  "dispatch": {
    "workflowMeta": {
      "userId": "12345",
      "requestId": "req-abc"
    },
    "workflowTags": ["order:ORD-001", "user:12345"]
  },
  "workflowName": "OrderWorkflow",
  "requester": "client-web-app",
  "infiniticVersion": "0.16.0"
}

Task Failed Event

{
  "taskFailed": {
    "error": {
      "message": "Database connection failed",
      "stackTrace": "..."
    },
    "taskId": "task-uuid",
    "serviceName": "DatabaseService"
  },
  "workflowName": "OrderWorkflow",
  "methodId": "method-uuid",
  "methodName": "processOrder"
}

Use Cases

Real-Time Dashboard

class DashboardEventListener : CloudEventListener {
    private val metrics = MetricsCollector()
    
    override suspend fun onEvent(event: CloudEvent) {
        when (event.type) {
            "infinitic.workflow.completed" -> metrics.incrementCompleted()
            "infinitic.workflow.failed" -> metrics.incrementFailed()
            "infinitic.task.retryScheduled" -> metrics.incrementRetries()
        }
        
        // Push to real-time dashboard
        dashboardService.update(metrics.getSnapshot())
    }
}

Alerting System

class AlertingEventListener : CloudEventListener {
    private val alertService = AlertService()
    
    override suspend fun onEvent(event: CloudEvent) {
        when (event.type) {
            "infinitic.workflow.failed" -> {
                alertService.sendAlert(
                    severity = AlertSeverity.HIGH,
                    message = "Workflow ${event.subject} failed",
                    details = event.data
                )
            }
            "infinitic.task.failed" -> {
                // Check if this is a critical task
                val data = parseEventData(event)
                if (data.serviceName in criticalServices) {
                    alertService.sendAlert(
                        severity = AlertSeverity.MEDIUM,
                        message = "Critical task ${event.subject} failed"
                    )
                }
            }
        }
    }
}

Audit Log

class AuditEventListener : CloudEventListener {
    private val auditLog = AuditLogService()
    
    override suspend fun onEvent(event: CloudEvent) {
        auditLog.record(
            timestamp = event.time,
            eventType = event.type,
            subject = event.subject,
            source = event.source,
            data = event.data
        )
    }
}

Best Practices

Don’t block event processing with slow operations. Use queues or async handlers.
Ensure your event listener doesn’t crash on unexpected event formats or data.
Use allow/disallow lists to reduce the number of events your listener receives.
Configure batch settings to process multiple events together for better performance.
Track event processing latency and errors in your listener.
Consider storing events in a data warehouse for historical analysis and debugging.

Build docs developers (and LLMs) love