What You’ll Build
A payment processing saga that:- Reserves inventory
- Processes payment
- Updates loyalty points
- Automatically compensates on failures
- Maintains data consistency
package com.example.saga
import kotlinx.serialization.Serializable
@Serializable
data class Order(
val orderId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: Double
)
@Serializable
data class OrderItem(
val productId: String,
val quantity: Int,
val price: Double
)
@Serializable
data class ReservationResult(
val reservationId: String,
val items: List<OrderItem>
)
@Serializable
data class PaymentResult(
val transactionId: String,
val amount: Double,
val status: String
)
@Serializable
data class LoyaltyResult(
val pointsAdded: Int,
val newBalance: Int
)
@Serializable
data class OrderResult(
val order: Order,
val reservation: ReservationResult?,
val payment: PaymentResult?,
val loyalty: LoyaltyResult?,
val success: Boolean,
val errorMessage: String?
)
package com.example.saga
interface InventoryService {
fun reserveItems(items: List<OrderItem>): ReservationResult
fun releaseReservation(reservationId: String)
fun checkAvailability(items: List<OrderItem>): Boolean
}
interface PaymentService {
fun processPayment(customerId: String, amount: Double): PaymentResult
fun refundPayment(transactionId: String)
fun validatePaymentMethod(customerId: String): Boolean
}
interface LoyaltyService {
fun addPoints(customerId: String, amount: Double): LoyaltyResult
fun removePoints(customerId: String, points: Int)
}
interface OrderService {
fun createOrder(order: Order)
fun markOrderFailed(orderId: String, reason: String)
fun markOrderComplete(orderId: String)
}
package com.example.saga
import java.util.UUID
class InventoryServiceImpl : InventoryService {
override fun reserveItems(items: List<OrderItem>): ReservationResult {
println("📦 Reserving inventory for ${items.size} items")
Thread.sleep(500)
// Check if items are available
items.forEach { item ->
if (item.quantity > 100) {
throw Exception("Insufficient inventory for product ${item.productId}")
}
}
return ReservationResult(
reservationId = "RES-${UUID.randomUUID()}",
items = items
)
}
override fun releaseReservation(reservationId: String) {
println("🔓 Releasing reservation: $reservationId")
Thread.sleep(300)
}
override fun checkAvailability(items: List<OrderItem>): Boolean {
return items.all { it.quantity <= 100 }
}
}
class PaymentServiceImpl : PaymentService {
override fun processPayment(customerId: String, amount: Double): PaymentResult {
println("💳 Processing payment of $$amount for $customerId")
Thread.sleep(1000)
// Simulate payment validation
if (amount > 10000) {
throw Exception("Payment amount exceeds limit")
}
return PaymentResult(
transactionId = "TXN-${UUID.randomUUID()}",
amount = amount,
status = "COMPLETED"
)
}
override fun refundPayment(transactionId: String) {
println("💰 Refunding payment: $transactionId")
Thread.sleep(800)
}
override fun validatePaymentMethod(customerId: String): Boolean {
return true // Simplified validation
}
}
class LoyaltyServiceImpl : LoyaltyService {
override fun addPoints(customerId: String, amount: Double): LoyaltyResult {
println("⭐ Adding loyalty points for $customerId")
Thread.sleep(300)
val points = (amount * 0.1).toInt() // 10% back in points
return LoyaltyResult(
pointsAdded = points,
newBalance = 1000 + points // Simplified
)
}
override fun removePoints(customerId: String, points: Int) {
println("❌ Removing $points loyalty points from $customerId")
Thread.sleep(200)
}
}
class OrderServiceImpl : OrderService {
override fun createOrder(order: Order) {
println("📝 Creating order: ${order.orderId}")
}
override fun markOrderFailed(orderId: String, reason: String) {
println("❌ Order $orderId failed: $reason")
}
override fun markOrderComplete(orderId: String) {
println("✅ Order $orderId completed successfully")
}
}
package com.example.saga
interface OrderSagaWorkflow {
fun processOrder(order: Order): OrderResult
}
package com.example.saga
import io.infinitic.workflows.Workflow
import io.infinitic.exceptions.TaskFailedException
class OrderSagaWorkflowImpl : Workflow(), OrderSagaWorkflow {
private val inventoryService = newService(InventoryService::class.java)
private val paymentService = newService(PaymentService::class.java)
private val loyaltyService = newService(LoyaltyService::class.java)
private val orderService = newService(OrderService::class.java)
override fun processOrder(order: Order): OrderResult {
println("\n🚀 Starting order saga for ${order.orderId}")
var reservation: ReservationResult? = null
var payment: PaymentResult? = null
var loyalty: LoyaltyResult? = null
try {
// Step 1: Reserve inventory
reservation = inventoryService.reserveItems(order.items)
println("✓ Inventory reserved: ${reservation.reservationId}")
// Step 2: Process payment
payment = paymentService.processPayment(order.customerId, order.totalAmount)
println("✓ Payment processed: ${payment.transactionId}")
// Step 3: Add loyalty points
loyalty = loyaltyService.addPoints(order.customerId, order.totalAmount)
println("✓ Loyalty points added: ${loyalty.pointsAdded}")
// All steps succeeded - mark order complete
orderService.markOrderComplete(order.orderId)
return OrderResult(
order = order,
reservation = reservation,
payment = payment,
loyalty = loyalty,
success = true,
errorMessage = null
)
} catch (e: Exception) {
println("\n⚠️ Saga failed: ${e.message}")
println("🔄 Starting compensation...\n")
// Compensate in reverse order
compensate(reservation, payment, loyalty)
// Mark order as failed
val errorMsg = e.message ?: "Unknown error"
orderService.markOrderFailed(order.orderId, errorMsg)
return OrderResult(
order = order,
reservation = reservation,
payment = payment,
loyalty = loyalty,
success = false,
errorMessage = errorMsg
)
}
}
private fun compensate(
reservation: ReservationResult?,
payment: PaymentResult?,
loyalty: LoyaltyResult?
) {
// Compensate loyalty points (if added)
if (loyalty != null) {
try {
loyaltyService.removePoints(
customerId = "customer", // Would use actual customer ID
points = loyalty.pointsAdded
)
} catch (e: Exception) {
println("⚠️ Failed to compensate loyalty: ${e.message}")
}
}
// Refund payment (if processed)
if (payment != null) {
try {
paymentService.refundPayment(payment.transactionId)
} catch (e: Exception) {
println("⚠️ Failed to refund payment: ${e.message}")
}
}
// Release inventory reservation (if made)
if (reservation != null) {
try {
inventoryService.releaseReservation(reservation.reservationId)
} catch (e: Exception) {
println("⚠️ Failed to release reservation: ${e.message}")
}
}
println("✅ Compensation complete\n")
}
}
transport: inMemory
services:
- name: InventoryService
class: com.example.saga.InventoryServiceImpl
retry:
maximumRetries: 3
- name: PaymentService
class: com.example.saga.PaymentServiceImpl
retry:
maximumRetries: 3
- name: LoyaltyService
class: com.example.saga.LoyaltyServiceImpl
- name: OrderService
class: com.example.saga.OrderServiceImpl
workflows:
- name: OrderSagaWorkflow
class: com.example.saga.OrderSagaWorkflowImpl
package com.example.saga
import io.infinitic.worker.InfiniticWorker
fun main() {
val worker = InfiniticWorker.fromConfigFile("infinitic.yml")
worker.start()
println("Saga workflow workers started...")
}
package com.example.saga
import io.infinitic.client.InfiniticClient
import java.util.UUID
fun main() {
val client = InfiniticClient.fromConfigFile("infinitic.yml")
// Create a valid order
val order = Order(
orderId = UUID.randomUUID().toString(),
customerId = "customer-123",
items = listOf(
OrderItem("PROD-001", 2, 50.0),
OrderItem("PROD-002", 1, 75.0)
),
totalAmount = 175.0
)
val workflow = client.newWorkflow(OrderSagaWorkflow::class.java)
val result = workflow.processOrder(order)
if (result.success) {
println("\n✅ Order completed successfully!")
println(" Reservation: ${result.reservation?.reservationId}")
println(" Payment: ${result.payment?.transactionId}")
println(" Loyalty Points: ${result.loyalty?.pointsAdded}")
} else {
println("\n❌ Order failed: ${result.errorMessage}")
}
client.close()
}
fun testFailedOrder() {
val client = InfiniticClient.fromConfigFile("infinitic.yml")
// Create an order that will fail (exceeds payment limit)
val largeOrder = Order(
orderId = UUID.randomUUID().toString(),
customerId = "customer-456",
items = listOf(
OrderItem("PROD-003", 5, 2500.0)
),
totalAmount = 12500.0 // Exceeds limit
)
val workflow = client.newWorkflow(OrderSagaWorkflow::class.java)
val result = workflow.processOrder(largeOrder)
if (!result.success) {
println("\n❌ Order failed as expected: ${result.errorMessage}")
println(" Compensation was performed automatically")
}
client.close()
}
Expected Output
Successful Order:
Failed Order with Compensation:
Key Concepts
Forward Recovery
Forward Recovery
The saga executes steps sequentially. Each step must be designed to be independently executable and compensatable.
Backward Compensation
Backward Compensation
When a step fails, compensation actions are executed in reverse order to undo completed work. This maintains eventual consistency.
Idempotency
Idempotency
Both forward actions and compensations should be idempotent, allowing them to be safely retried if a failure occurs.
State Tracking
State Tracking
The workflow tracks which steps completed successfully, enabling precise compensation of only the necessary actions.
Error Handling
Error Handling
Compensation actions themselves can fail. Use try-catch blocks around each compensation and log failures for manual intervention if needed.
Best Practices
- Order Matters: Compensate in reverse order of execution
- Idempotent Operations: Design both actions and compensations to be safely retryable
- Timeouts: Set appropriate timeouts for each step
- Logging: Log each step and compensation for debugging
- Partial Success: Handle scenarios where compensation partially succeeds
- Manual Intervention: Some compensations may require human intervention
Next Steps
- Learn about error handling in workflows
- Explore parallel execution for independent transactions
- Try long-running tasks with progress tracking