What You’ll Build
A data processing pipeline that:- Processes large datasets
- Reports progress updates
- Handles timeouts gracefully
- Uses task delegation for external systems
- Implements cancellation support
package com.example.longrunning
import kotlinx.serialization.Serializable
@Serializable
data class ProcessingJob(
val jobId: String,
val datasetUrl: String,
val processingType: String,
val estimatedDurationMinutes: Int
)
@Serializable
data class ProcessingProgress(
val jobId: String,
val percentComplete: Int,
val recordsProcessed: Long,
val totalRecords: Long,
val status: String
)
@Serializable
data class ProcessingResult(
val jobId: String,
val success: Boolean,
val recordsProcessed: Long,
val durationSeconds: Long,
val outputUrl: String?,
val errorMessage: String?
)
package com.example.longrunning
import io.infinitic.annotations.Timeout
import io.infinitic.annotations.Delegated
// Timeout configuration class
class FiveMinuteTimeout : io.infinitic.tasks.WithTimeout {
override fun getTimeoutSeconds() = 300.0 // 5 minutes
}
class ThirtyMinuteTimeout : io.infinitic.tasks.WithTimeout {
override fun getTimeoutSeconds() = 1800.0 // 30 minutes
}
interface DataProcessingService {
// Quick validation with short timeout
@Timeout(FiveMinuteTimeout::class)
fun validateDataset(datasetUrl: String): Boolean
// Long-running processing with timeout
@Timeout(ThirtyMinuteTimeout::class)
fun processDataset(job: ProcessingJob): ProcessingResult
// Delegated task for external system
@Delegated
fun processWithExternalSystem(job: ProcessingJob): ProcessingResult
// Monitor progress (called periodically)
fun getProgress(jobId: String): ProcessingProgress
// Cancel running job
fun cancelJob(jobId: String)
}
interface StorageService {
fun downloadDataset(url: String): Long
fun uploadResults(jobId: String, data: ByteArray): String
}
interface NotificationService {
fun notifyJobStarted(job: ProcessingJob)
fun notifyProgress(progress: ProcessingProgress)
fun notifyJobComplete(result: ProcessingResult)
}
package com.example.longrunning
import io.infinitic.tasks.Task
import java.util.concurrent.ConcurrentHashMap
class DataProcessingServiceImpl : DataProcessingService {
companion object {
// Track running jobs
private val runningJobs = ConcurrentHashMap<String, ProcessingProgress>()
}
override fun validateDataset(datasetUrl: String): Boolean {
println("🔍 Validating dataset: $datasetUrl")
Thread.sleep(2000) // Simulate validation
return datasetUrl.isNotBlank()
}
override fun processDataset(job: ProcessingJob): ProcessingResult {
println("⚙️ Starting processing: ${job.jobId}")
val startTime = System.currentTimeMillis()
try {
// Initialize progress tracking
val totalRecords = 1000L
var processed = 0L
runningJobs[job.jobId] = ProcessingProgress(
jobId = job.jobId,
percentComplete = 0,
recordsProcessed = 0,
totalRecords = totalRecords,
status = "RUNNING"
)
// Simulate long-running processing with progress updates
repeat(10) { iteration ->
// Check for timeout
if (Task.hasTimedOut) {
println("⏱️ Task timeout detected, wrapping up...")
throw Exception("Processing timed out")
}
// Process batch
Thread.sleep(3000) // Simulate 3 seconds per batch
processed += 100
// Update progress
val percent = (processed * 100 / totalRecords).toInt()
runningJobs[job.jobId] = ProcessingProgress(
jobId = job.jobId,
percentComplete = percent,
recordsProcessed = processed,
totalRecords = totalRecords,
status = "RUNNING"
)
println(" Progress: $percent% ($processed/$totalRecords records)")
}
val duration = (System.currentTimeMillis() - startTime) / 1000
return ProcessingResult(
jobId = job.jobId,
success = true,
recordsProcessed = processed,
durationSeconds = duration,
outputUrl = "s3://bucket/results/${job.jobId}.csv",
errorMessage = null
)
} catch (e: Exception) {
val duration = (System.currentTimeMillis() - startTime) / 1000
return ProcessingResult(
jobId = job.jobId,
success = false,
recordsProcessed = 0,
durationSeconds = duration,
outputUrl = null,
errorMessage = e.message
)
} finally {
runningJobs.remove(job.jobId)
}
}
override fun processWithExternalSystem(job: ProcessingJob): ProcessingResult {
// This method delegates to an external system
// The actual implementation is provided by the external system
throw UnsupportedOperationException("This task is delegated")
}
override fun getProgress(jobId: String): ProcessingProgress {
return runningJobs[jobId] ?: ProcessingProgress(
jobId = jobId,
percentComplete = 100,
recordsProcessed = 0,
totalRecords = 0,
status = "NOT_FOUND"
)
}
override fun cancelJob(jobId: String) {
println("🛑 Cancelling job: $jobId")
runningJobs.remove(jobId)
}
}
class StorageServiceImpl : StorageService {
override fun downloadDataset(url: String): Long {
println("📥 Downloading dataset from $url")
Thread.sleep(1000)
return 1000L // Return record count
}
override fun uploadResults(jobId: String, data: ByteArray): String {
println("📤 Uploading results for $jobId")
Thread.sleep(1000)
return "s3://bucket/results/$jobId.csv"
}
}
class NotificationServiceImpl : NotificationService {
override fun notifyJobStarted(job: ProcessingJob) {
println("📧 Notification: Job ${job.jobId} started")
}
override fun notifyProgress(progress: ProcessingProgress) {
println("📧 Progress update: ${progress.percentComplete}% complete")
}
override fun notifyJobComplete(result: ProcessingResult) {
val status = if (result.success) "✅ SUCCESS" else "❌ FAILED"
println("📧 Notification: Job ${result.jobId} $status")
}
}
package com.example.longrunning
import io.infinitic.workflows.SendChannel
interface ProcessingWorkflow {
// Channel for cancellation requests
val cancelChannel: SendChannel<String>
fun runProcessingJob(job: ProcessingJob): ProcessingResult
fun runWithProgressTracking(job: ProcessingJob): ProcessingResult
}
package com.example.longrunning
import io.infinitic.workflows.Workflow
import io.infinitic.workflows.or
import java.time.Duration
class ProcessingWorkflowImpl : Workflow(), ProcessingWorkflow {
override val cancelChannel = channel<String>()
private val processingService = newService(DataProcessingService::class.java)
private val storageService = newService(StorageService::class.java)
private val notificationService = newService(NotificationService::class.java)
override fun runProcessingJob(job: ProcessingJob): ProcessingResult {
println("\n🚀 Starting processing workflow for ${job.jobId}\n")
// Notify job started
notificationService.notifyJobStarted(job)
// Validate dataset first
val isValid = processingService.validateDataset(job.datasetUrl)
if (!isValid) {
val result = ProcessingResult(
jobId = job.jobId,
success = false,
recordsProcessed = 0,
durationSeconds = 0,
outputUrl = null,
errorMessage = "Invalid dataset URL"
)
notificationService.notifyJobComplete(result)
return result
}
// Download dataset
val recordCount = storageService.downloadDataset(job.datasetUrl)
println("✓ Dataset downloaded: $recordCount records\n")
// Process the dataset (long-running)
val result = processingService.processDataset(job)
// Notify completion
notificationService.notifyJobComplete(result)
return result
}
override fun runWithProgressTracking(job: ProcessingJob): ProcessingResult {
println("\n🚀 Starting processing with progress tracking for ${job.jobId}\n")
notificationService.notifyJobStarted(job)
// Dispatch the processing task asynchronously
val processingDeferred = dispatch(processingService::processDataset, job)
// Monitor progress while processing
while (!processingDeferred.isCompleted()) {
// Wait a bit between progress checks
val timerDeferred = timer(Duration.ofSeconds(5))
val cancelDeferred = cancelChannel.receive()
// Wait for timer or cancel signal
when ((timerDeferred or cancelDeferred).await()) {
is String -> {
// Cancellation requested
println("\n🛑 Cancellation requested\n")
processingService.cancelJob(job.jobId)
return ProcessingResult(
jobId = job.jobId,
success = false,
recordsProcessed = 0,
durationSeconds = 0,
outputUrl = null,
errorMessage = "Cancelled by user"
)
}
}
// Get and report progress
if (!processingDeferred.isCompleted()) {
val progress = processingService.getProgress(job.jobId)
if (progress.status == "RUNNING") {
notificationService.notifyProgress(progress)
}
}
}
// Get final result
val result = processingDeferred.await()
notificationService.notifyJobComplete(result)
return result
}
}
transport: inMemory
services:
- name: DataProcessingService
class: com.example.longrunning.DataProcessingServiceImpl
concurrency: 5
- name: StorageService
class: com.example.longrunning.StorageServiceImpl
- name: NotificationService
class: com.example.longrunning.NotificationServiceImpl
workflows:
- name: ProcessingWorkflow
class: com.example.longrunning.ProcessingWorkflowImpl
package com.example.longrunning
import io.infinitic.worker.InfiniticWorker
fun main() {
val worker = InfiniticWorker.fromConfigFile("infinitic.yml")
worker.start()
println("Long-running task workers started...")
}
package com.example.longrunning
import io.infinitic.client.InfiniticClient
import java.util.UUID
fun main() {
val client = InfiniticClient.fromConfigFile("infinitic.yml")
val job = ProcessingJob(
jobId = UUID.randomUUID().toString(),
datasetUrl = "s3://bucket/data/input.csv",
processingType = "TRANSFORM",
estimatedDurationMinutes = 10
)
val workflow = client.newWorkflow(ProcessingWorkflow::class.java)
println("Starting long-running processing job...\n")
val result = workflow.runProcessingJob(job)
if (result.success) {
println("\n✅ Processing completed successfully")
println(" Records processed: ${result.recordsProcessed}")
println(" Duration: ${result.durationSeconds} seconds")
println(" Output: ${result.outputUrl}")
} else {
println("\n❌ Processing failed: ${result.errorMessage}")
}
client.close()
}
fun testProgressTracking() = runBlocking {
val client = InfiniticClient.fromConfigFile("infinitic.yml")
val job = ProcessingJob(
jobId = UUID.randomUUID().toString(),
datasetUrl = "s3://bucket/data/large-dataset.csv",
processingType = "ANALYZE",
estimatedDurationMinutes = 30
)
val workflow = client.newWorkflow(ProcessingWorkflow::class.java)
val deferred = client.dispatch(workflow::runWithProgressTracking, job)
println("Job dispatched with ID: ${deferred.id}\n")
println("Progress updates will be shown...\n")
val result = deferred.await()
println("\n" + "=".repeat(50))
if (result.success) {
println("✅ Job completed successfully")
} else {
println("❌ Job failed: ${result.errorMessage}")
}
client.close()
}
Expected Output
Key Concepts
Timeouts
Timeouts
Use the
@Timeout annotation to specify maximum execution time for tasks. Infinitic automatically terminates tasks that exceed their timeout and can retry them.Timeout Detection
Timeout Detection
Inside task implementations, check
Task.hasTimedOut to detect when a timeout is approaching and gracefully shut down.Task Delegation
Task Delegation
The
@Delegated annotation marks tasks that are handled by external systems. Infinitic tracks these tasks but doesn’t execute them directly.Progress Monitoring
Progress Monitoring
Dispatch long-running tasks asynchronously, then poll for progress using a separate service method while checking
isCompleted() on the deferred.Async Dispatch
Async Dispatch
Use
dispatch() for long-running tasks to avoid blocking the workflow. This allows the workflow to perform other operations while waiting.Best Practices
- Set Appropriate Timeouts: Configure timeouts based on expected task duration
- Handle Timeout Gracefully: Check
Task.hasTimedOutand clean up resources - Progress Updates: Report progress for tasks longer than a few minutes
- Cancellation Support: Implement proper cancellation handling for user-initiated stops
- Resource Cleanup: Always clean up resources in finally blocks
- Retry Strategy: Configure retry policies for transient failures
Next Steps
- Learn about retry strategies
- Explore task timeouts in detail
- Try child workflows for complex orchestration