What You’ll Build
An expense approval workflow that:- Submits expense reports for approval
- Waits for manager approval or rejection
- Handles approval timeouts
- Notifies users of decisions
- Auto-approves small amounts
package com.example.approval
import kotlinx.serialization.Serializable
import java.time.Instant
@Serializable
data class ExpenseReport(
val id: String,
val employee: String,
val amount: Double,
val category: String,
val description: String,
val submittedAt: String = Instant.now().toString()
)
@Serializable
enum class ApprovalStatus {
PENDING,
APPROVED,
REJECTED,
AUTO_APPROVED,
TIMED_OUT
}
@Serializable
data class ApprovalDecision(
val status: ApprovalStatus,
val approver: String?,
val comments: String?,
val decidedAt: String = Instant.now().toString()
)
@Serializable
data class ApprovalResult(
val report: ExpenseReport,
val decision: ApprovalDecision
)
package com.example.approval
interface NotificationService {
fun notifyApprovalNeeded(manager: String, report: ExpenseReport)
fun notifyEmployee(employee: String, result: ApprovalResult)
}
interface ExpenseService {
fun validateExpense(report: ExpenseReport): Boolean
fun recordDecision(result: ApprovalResult)
fun getManagerForEmployee(employee: String): String
}
package com.example.approval
class NotificationServiceImpl : NotificationService {
override fun notifyApprovalNeeded(manager: String, report: ExpenseReport) {
println("📧 Notification sent to $manager:")
println(" Expense approval needed for ${report.employee}")
println(" Amount: $${report.amount} - ${report.category}")
}
override fun notifyEmployee(employee: String, result: ApprovalResult) {
val status = result.decision.status
println("📧 Notification sent to $employee:")
println(" Your expense report has been $status")
result.decision.comments?.let { println(" Comments: $it") }
}
}
class ExpenseServiceImpl : ExpenseService {
override fun validateExpense(report: ExpenseReport): Boolean {
// Validate expense rules
return report.amount > 0 && report.description.isNotBlank()
}
override fun recordDecision(result: ApprovalResult) {
println("💾 Recording decision: ${result.decision.status} for ${result.report.id}")
}
override fun getManagerForEmployee(employee: String): String {
// Lookup manager in system
return "[email protected]"
}
}
package com.example.approval
import io.infinitic.workflows.SendChannel
interface ApprovalWorkflow {
// Channel to receive approval decisions
val approvalChannel: SendChannel<ApprovalDecision>
fun submitExpense(report: ExpenseReport): ApprovalResult
}
package com.example.approval
import io.infinitic.workflows.Workflow
import io.infinitic.workflows.or
import java.time.Duration
import java.time.Instant
class ApprovalWorkflowImpl : Workflow(), ApprovalWorkflow {
// Define the approval channel
override val approvalChannel = channel<ApprovalDecision>()
private val notificationService = newService(NotificationService::class.java)
private val expenseService = newService(ExpenseService::class.java)
// Auto-approve threshold
private val AUTO_APPROVE_THRESHOLD = 100.0
// Approval timeout (48 hours in production)
private val APPROVAL_TIMEOUT = Duration.ofSeconds(30)
override fun submitExpense(report: ExpenseReport): ApprovalResult {
// Validate the expense
val isValid = expenseService.validateExpense(report)
if (!isValid) {
return ApprovalResult(
report = report,
decision = ApprovalDecision(
status = ApprovalStatus.REJECTED,
approver = "SYSTEM",
comments = "Invalid expense report"
)
)
}
// Auto-approve small expenses
if (report.amount <= AUTO_APPROVE_THRESHOLD) {
val decision = ApprovalDecision(
status = ApprovalStatus.AUTO_APPROVED,
approver = "SYSTEM",
comments = "Auto-approved: amount below threshold"
)
val result = ApprovalResult(report, decision)
expenseService.recordDecision(result)
notificationService.notifyEmployee(report.employee, result)
return result
}
// Get the employee's manager
val manager = expenseService.getManagerForEmployee(report.employee)
// Notify manager that approval is needed
notificationService.notifyApprovalNeeded(manager, report)
// Wait for approval decision or timeout
val decision = waitForApproval()
// Create the result
val result = ApprovalResult(report, decision)
// Record the decision
expenseService.recordDecision(result)
// Notify employee
notificationService.notifyEmployee(report.employee, result)
return result
}
private fun waitForApproval(): ApprovalDecision {
// Create a deferred that waits for the approval channel
val approvalDeferred = approvalChannel.receive()
// Create a timer for timeout
val timeoutDeferred = timer(APPROVAL_TIMEOUT)
// Wait for either approval or timeout (whichever comes first)
return when (val result = (approvalDeferred or timeoutDeferred).await()) {
is ApprovalDecision -> result
is Instant -> ApprovalDecision(
status = ApprovalStatus.TIMED_OUT,
approver = "SYSTEM",
comments = "Approval request timed out after ${APPROVAL_TIMEOUT.toHours()} hours"
)
else -> throw IllegalStateException("Unexpected result type")
}
}
}
transport: inMemory
services:
- name: NotificationService
class: com.example.approval.NotificationServiceImpl
- name: ExpenseService
class: com.example.approval.ExpenseServiceImpl
workflows:
- name: ApprovalWorkflow
class: com.example.approval.ApprovalWorkflowImpl
package com.example.approval
import io.infinitic.worker.InfiniticWorker
fun main() {
val worker = InfiniticWorker.fromConfigFile("infinitic.yml")
worker.start()
println("Approval workflow workers started...")
}
package com.example.approval
import io.infinitic.client.InfiniticClient
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.UUID
fun main() = runBlocking {
val client = InfiniticClient.fromConfigFile("infinitic.yml")
// Create an expense report
val report = ExpenseReport(
id = UUID.randomUUID().toString(),
employee = "[email protected]",
amount = 250.0,
category = "Travel",
description = "Client meeting in San Francisco"
)
println("Submitting expense report...\n")
// Submit the expense asynchronously
val workflow = client.newWorkflow(ApprovalWorkflow::class.java)
val deferred = client.dispatch(workflow::submitExpense, report)
println("Expense submitted with workflow ID: ${deferred.id}\n")
// Simulate manager approval after some time
delay(5000) // Wait 5 seconds
println("\nManager approving expense...\n")
// Get the workflow by ID and send approval
val workflowById = client.getWorkflowById(
ApprovalWorkflow::class.java,
deferred.id
)
workflowById.approvalChannel.send(
ApprovalDecision(
status = ApprovalStatus.APPROVED,
approver = "[email protected]",
comments = "Approved for client meeting expenses"
)
)
// Wait for final result
val result = deferred.await()
println("\n✅ Final result:")
println(" Status: ${result.decision.status}")
println(" Approver: ${result.decision.approver}")
println(" Comments: ${result.decision.comments}")
client.close()
}
fun testAutoApproval() {
val client = InfiniticClient.fromConfigFile("infinitic.yml")
val smallExpense = ExpenseReport(
id = UUID.randomUUID().toString(),
employee = "[email protected]",
amount = 75.0,
category = "Supplies",
description = "Office supplies"
)
val workflow = client.newWorkflow(ApprovalWorkflow::class.java)
val result = workflow.submitExpense(smallExpense)
println("Small expense status: ${result.decision.status}")
// Output: Small expense status: AUTO_APPROVED
client.close()
}
Expected Output
Key Concepts
Channels
Channels
Channels allow workflows to receive signals from external systems. Define a channel with
channel<Type>() and receive messages with receive().Waiting for Signals
Waiting for Signals
Use
approvalChannel.receive() to create a Deferred that will complete when a signal is sent to the channel. The workflow pauses execution until the signal arrives.Timeouts with OR
Timeouts with OR
Combine a channel receive with a timer using the
or operator. The workflow continues when either the signal arrives or the timeout expires, whichever happens first.Sending Signals
Sending Signals
Use
client.getWorkflowById() to get a reference to a running workflow, then call workflowInstance.channelName.send(data) to send signals.Long-Running Workflows
Long-Running Workflows
Approval workflows can run for hours or days. Infinitic persists the workflow state, so it survives system restarts while waiting for approval.
Next Steps
- Learn about filtering channel messages with JSONPath
- Explore multiple channels for different signal types
- Try the saga pattern for complex compensation logic