Skip to main content
Effective monitoring is essential for maintaining reliable workflow orchestration. Infinitic provides comprehensive observability through CloudEvents, status tracking, and integration points for monitoring systems.

What to Monitor

Key metrics and aspects to monitor in your Infinitic deployment:

Workflow Metrics

  • Completion rate - Percentage of workflows completing successfully
  • Failure rate - Percentage of workflows failing
  • Duration - Time from start to completion
  • Active workflows - Number of currently running workflows
  • Queued workflows - Workflows waiting to start

Task Metrics

  • Execution time - How long tasks take to complete
  • Retry rate - Frequency of task retries
  • Timeout rate - Tasks exceeding timeout limits
  • Failure rate - Task failure percentage
  • Queue depth - Number of pending tasks

System Metrics

  • Worker health - Status of worker instances
  • Message throughput - Messages processed per second
  • Resource utilization - CPU, memory, network usage
  • Storage usage - Workflow state storage consumption

Using CloudEvents for Monitoring

Infinitic emits CloudEvents for all lifecycle events, making it the primary mechanism for monitoring:
import io.infinitic.cloudEvents.CloudEventListener
import io.cloudevents.CloudEvent
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer

class MonitoringEventListener(
    private val meterRegistry: MeterRegistry
) : CloudEventListener {
    
    private val workflowCompletions = meterRegistry.counter("infinitic.workflow.completed")
    private val workflowFailures = meterRegistry.counter("infinitic.workflow.failed")
    private val taskRetries = meterRegistry.counter("infinitic.task.retry")
    private val workflowDurations = mutableMapOf<String, Long>()
    
    override suspend fun onEvent(event: CloudEvent) {
        when (event.type) {
            "infinitic.workflow.started" -> trackWorkflowStart(event)
            "infinitic.workflow.completed" -> trackWorkflowCompletion(event)
            "infinitic.workflow.failed" -> trackWorkflowFailure(event)
            "infinitic.task.retryScheduled" -> trackTaskRetry(event)
            "infinitic.task.completed" -> trackTaskCompletion(event)
        }
    }
    
    private fun trackWorkflowStart(event: CloudEvent) {
        val workflowId = event.subject
        workflowDurations[workflowId] = System.currentTimeMillis()
    }
    
    private fun trackWorkflowCompletion(event: CloudEvent) {
        workflowCompletions.increment()
        
        val workflowId = event.subject
        workflowDurations[workflowId]?.let { startTime ->
            val duration = System.currentTimeMillis() - startTime
            meterRegistry.timer("infinitic.workflow.duration")
                .record(duration, TimeUnit.MILLISECONDS)
            workflowDurations.remove(workflowId)
        }
    }
    
    private fun trackWorkflowFailure(event: CloudEvent) {
        workflowFailures.increment()
        workflowDurations.remove(event.subject)
    }
    
    private fun trackTaskRetry(event: CloudEvent) {
        taskRetries.increment()
    }
    
    private fun trackTaskCompletion(event: CloudEvent) {
        // Extract task execution time from event data
        // and record in metrics
    }
}

Checking Workflow Status

Query workflow status programmatically:
import io.infinitic.workflows.DeferredStatus

val workflow = client.getWorkflowById(OrderWorkflow::class.java, workflowId)
val deferred = workflow.getStatusAsync()

val status = deferred.await()
when (status) {
    DeferredStatus.ONGOING -> println("Workflow is running")
    DeferredStatus.COMPLETED -> println("Workflow completed successfully")
    DeferredStatus.FAILED -> println("Workflow failed")
    DeferredStatus.CANCELED -> println("Workflow was canceled")
    DeferredStatus.TIMED_OUT -> println("Workflow timed out")
    DeferredStatus.UNKNOWN -> println("Workflow status unknown")
}

Integration with Monitoring Tools

Prometheus Integration

import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry

class PrometheusMonitoring {
    private val prometheusRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
    
    fun getEventListener() = MonitoringEventListener(prometheusRegistry)
    
    fun scrapeEndpoint(): String {
        return prometheusRegistry.scrape()
    }
}

// Expose metrics endpoint
val monitoring = PrometheusMonitoring()
val listener = monitoring.getEventListener()

// HTTP endpoint at /metrics
get("/metrics") {
    call.respondText(monitoring.scrapeEndpoint(), ContentType.Text.Plain)
}

Grafana Dashboard

Create dashboards with key metrics:
# Workflow completion rate
rate(infinitic_workflow_completed_total[5m])

# Workflow failure rate
rate(infinitic_workflow_failed_total[5m])

# Average workflow duration
rate(infinitic_workflow_duration_sum[5m]) / rate(infinitic_workflow_duration_count[5m])

# Task retry rate
rate(infinitic_task_retry_total[5m])

# Active workflows
infinitic_workflow_active

DataDog Integration

import io.micrometer.datadog.DatadogConfig
import io.micrometer.datadog.DatadogMeterRegistry

class DataDogMonitoring(apiKey: String) {
    private val config = object : DatadogConfig {
        override fun apiKey() = apiKey
        override fun get(key: String) = null
    }
    
    private val registry = DatadogMeterRegistry(config, Clock.SYSTEM)
    
    fun getEventListener() = MonitoringEventListener(registry)
}

Health Checks

Implement health check endpoints for your workers and services:
class InfiniticHealthCheck(
    private val client: InfiniticClient
) {
    suspend fun checkHealth(): HealthStatus {
        return try {
            // Test basic connectivity
            val isConnected = testConnection()
            
            if (isConnected) {
                HealthStatus(
                    status = "healthy",
                    timestamp = System.currentTimeMillis(),
                    checks = mapOf(
                        "client" to "connected",
                        "workers" to "running"
                    )
                )
            } else {
                HealthStatus(
                    status = "unhealthy",
                    timestamp = System.currentTimeMillis(),
                    checks = mapOf(
                        "client" to "disconnected"
                    )
                )
            }
        } catch (e: Exception) {
            HealthStatus(
                status = "unhealthy",
                timestamp = System.currentTimeMillis(),
                error = e.message
            )
        }
    }
    
    private fun testConnection(): Boolean {
        // Implement connection test
        return true
    }
}

data class HealthStatus(
    val status: String,
    val timestamp: Long,
    val checks: Map<String, String>? = null,
    val error: String? = null
)

Alerting

Set up alerts based on metrics:
class AlertingService {
    private val alertThresholds = AlertThresholds(
        maxFailureRate = 0.05, // 5%
        maxRetryRate = 0.10,    // 10%
        maxDuration = 300000    // 5 minutes
    )
    
    fun checkAlerts(metrics: WorkflowMetrics) {
        if (metrics.failureRate > alertThresholds.maxFailureRate) {
            sendAlert(
                AlertLevel.CRITICAL,
                "High workflow failure rate: ${metrics.failureRate * 100}%"
            )
        }
        
        if (metrics.retryRate > alertThresholds.maxRetryRate) {
            sendAlert(
                AlertLevel.WARNING,
                "High task retry rate: ${metrics.retryRate * 100}%"
            )
        }
        
        if (metrics.averageDuration > alertThresholds.maxDuration) {
            sendAlert(
                AlertLevel.WARNING,
                "Workflow duration exceeded threshold: ${metrics.averageDuration}ms"
            )
        }
    }
    
    private fun sendAlert(level: AlertLevel, message: String) {
        // Send to alerting system (PagerDuty, Slack, etc.)
    }
}

Logging Best Practices

Structured logging for better observability:
import io.github.oshai.kotlinlogging.KotlinLogging

class WorkflowLogger {
    private val logger = KotlinLogging.logger {}
    
    fun logWorkflowStart(workflowId: String, workflowName: String, tags: Set<String>) {
        logger.info {
            mapOf(
                "event" to "workflow.started",
                "workflowId" to workflowId,
                "workflowName" to workflowName,
                "tags" to tags.joinToString(",")
            )
        }
    }
    
    fun logWorkflowCompletion(workflowId: String, duration: Long) {
        logger.info {
            mapOf(
                "event" to "workflow.completed",
                "workflowId" to workflowId,
                "durationMs" to duration
            )
        }
    }
    
    fun logTaskFailure(taskId: String, serviceName: String, error: String, retry: Int) {
        logger.warn {
            mapOf(
                "event" to "task.failed",
                "taskId" to taskId,
                "serviceName" to serviceName,
                "error" to error,
                "retryAttempt" to retry
            )
        }
    }
}

Distributed Tracing

Integrate with distributed tracing systems:
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.api.trace.SpanKind

class TracingEventListener(
    private val tracer: Tracer
) : CloudEventListener {
    
    override suspend fun onEvent(event: CloudEvent) {
        val span = tracer.spanBuilder(event.type)
            .setSpanKind(SpanKind.INTERNAL)
            .setAttribute("workflow.id", event.subject ?: "")
            .setAttribute("event.source", event.source?.toString() ?: "")
            .startSpan()
        
        try {
            // Process event
            processEvent(event)
        } finally {
            span.end()
        }
    }
}

Best Practices

Track workflows from start to finish, including all task executions and retries.
Alert on high failure rates, timeout rates, or specific critical workflow failures.
Tag workflows with relevant identifiers for filtering and grouping in dashboards.
Monitor business-level metrics (orders processed, payments completed) alongside technical metrics.
Regular health checks ensure workers and services are running correctly.
Retain metrics for trend analysis and capacity planning.
Structured logs are easier to search, filter, and analyze.

Example: Complete Monitoring Setup

class InfiniticMonitoringSystem(
    private val prometheusRegistry: PrometheusMeterRegistry,
    private val alertingService: AlertingService
) {
    
    private val metrics = WorkflowMetricsCollector(prometheusRegistry)
    
    fun createEventListener(): CloudEventListener {
        return object : CloudEventListener {
            override suspend fun onEvent(event: CloudEvent) {
                // Collect metrics
                metrics.recordEvent(event)
                
                // Check alerts
                val currentMetrics = metrics.getCurrentMetrics()
                alertingService.checkAlerts(currentMetrics)
                
                // Log event
                logger.debug { "Event: ${event.type} for ${event.subject}" }
            }
        }
    }
    
    fun getMetricsEndpoint(): String {
        return prometheusRegistry.scrape()
    }
    
    fun getHealthStatus(): HealthStatus {
        return HealthStatus(
            status = "healthy",
            timestamp = System.currentTimeMillis(),
            metrics = metrics.getCurrentMetrics()
        )
    }
}

Build docs developers (and LLMs) love