Skip to main content
This example demonstrates how to work with long-running tasks in Infinitic. You’ll learn about timeouts, progress tracking, task delegation, and handling operations that take minutes or hours to complete.

What You’ll Build

A data processing pipeline that:
  • Processes large datasets
  • Reports progress updates
  • Handles timeouts gracefully
  • Uses task delegation for external systems
  • Implements cancellation support
1
Define Data Models
2
Create models for processing jobs:
3
package com.example.longrunning

import kotlinx.serialization.Serializable

@Serializable
data class ProcessingJob(
    val jobId: String,
    val datasetUrl: String,
    val processingType: String,
    val estimatedDurationMinutes: Int
)

@Serializable
data class ProcessingProgress(
    val jobId: String,
    val percentComplete: Int,
    val recordsProcessed: Long,
    val totalRecords: Long,
    val status: String
)

@Serializable
data class ProcessingResult(
    val jobId: String,
    val success: Boolean,
    val recordsProcessed: Long,
    val durationSeconds: Long,
    val outputUrl: String?,
    val errorMessage: String?
)
4
Define Service Interfaces
5
Create services with timeout configurations:
6
package com.example.longrunning

import io.infinitic.annotations.Timeout
import io.infinitic.annotations.Delegated

// Timeout configuration class
class FiveMinuteTimeout : io.infinitic.tasks.WithTimeout {
    override fun getTimeoutSeconds() = 300.0 // 5 minutes
}

class ThirtyMinuteTimeout : io.infinitic.tasks.WithTimeout {
    override fun getTimeoutSeconds() = 1800.0 // 30 minutes
}

interface DataProcessingService {
    
    // Quick validation with short timeout
    @Timeout(FiveMinuteTimeout::class)
    fun validateDataset(datasetUrl: String): Boolean
    
    // Long-running processing with timeout
    @Timeout(ThirtyMinuteTimeout::class)
    fun processDataset(job: ProcessingJob): ProcessingResult
    
    // Delegated task for external system
    @Delegated
    fun processWithExternalSystem(job: ProcessingJob): ProcessingResult
    
    // Monitor progress (called periodically)
    fun getProgress(jobId: String): ProcessingProgress
    
    // Cancel running job
    fun cancelJob(jobId: String)
}

interface StorageService {
    fun downloadDataset(url: String): Long
    fun uploadResults(jobId: String, data: ByteArray): String
}

interface NotificationService {
    fun notifyJobStarted(job: ProcessingJob)
    fun notifyProgress(progress: ProcessingProgress)
    fun notifyJobComplete(result: ProcessingResult)
}
7
Implement the Services
8
Implement services with long-running operations:
9
package com.example.longrunning

import io.infinitic.tasks.Task
import java.util.concurrent.ConcurrentHashMap

class DataProcessingServiceImpl : DataProcessingService {
    
    companion object {
        // Track running jobs
        private val runningJobs = ConcurrentHashMap<String, ProcessingProgress>()
    }
    
    override fun validateDataset(datasetUrl: String): Boolean {
        println("🔍 Validating dataset: $datasetUrl")
        Thread.sleep(2000) // Simulate validation
        return datasetUrl.isNotBlank()
    }
    
    override fun processDataset(job: ProcessingJob): ProcessingResult {
        println("⚙️  Starting processing: ${job.jobId}")
        val startTime = System.currentTimeMillis()
        
        try {
            // Initialize progress tracking
            val totalRecords = 1000L
            var processed = 0L
            
            runningJobs[job.jobId] = ProcessingProgress(
                jobId = job.jobId,
                percentComplete = 0,
                recordsProcessed = 0,
                totalRecords = totalRecords,
                status = "RUNNING"
            )
            
            // Simulate long-running processing with progress updates
            repeat(10) { iteration ->
                // Check for timeout
                if (Task.hasTimedOut) {
                    println("⏱️  Task timeout detected, wrapping up...")
                    throw Exception("Processing timed out")
                }
                
                // Process batch
                Thread.sleep(3000) // Simulate 3 seconds per batch
                processed += 100
                
                // Update progress
                val percent = (processed * 100 / totalRecords).toInt()
                runningJobs[job.jobId] = ProcessingProgress(
                    jobId = job.jobId,
                    percentComplete = percent,
                    recordsProcessed = processed,
                    totalRecords = totalRecords,
                    status = "RUNNING"
                )
                
                println("   Progress: $percent% ($processed/$totalRecords records)")
            }
            
            val duration = (System.currentTimeMillis() - startTime) / 1000
            
            return ProcessingResult(
                jobId = job.jobId,
                success = true,
                recordsProcessed = processed,
                durationSeconds = duration,
                outputUrl = "s3://bucket/results/${job.jobId}.csv",
                errorMessage = null
            )
            
        } catch (e: Exception) {
            val duration = (System.currentTimeMillis() - startTime) / 1000
            
            return ProcessingResult(
                jobId = job.jobId,
                success = false,
                recordsProcessed = 0,
                durationSeconds = duration,
                outputUrl = null,
                errorMessage = e.message
            )
        } finally {
            runningJobs.remove(job.jobId)
        }
    }
    
    override fun processWithExternalSystem(job: ProcessingJob): ProcessingResult {
        // This method delegates to an external system
        // The actual implementation is provided by the external system
        throw UnsupportedOperationException("This task is delegated")
    }
    
    override fun getProgress(jobId: String): ProcessingProgress {
        return runningJobs[jobId] ?: ProcessingProgress(
            jobId = jobId,
            percentComplete = 100,
            recordsProcessed = 0,
            totalRecords = 0,
            status = "NOT_FOUND"
        )
    }
    
    override fun cancelJob(jobId: String) {
        println("🛑 Cancelling job: $jobId")
        runningJobs.remove(jobId)
    }
}

class StorageServiceImpl : StorageService {
    override fun downloadDataset(url: String): Long {
        println("📥 Downloading dataset from $url")
        Thread.sleep(1000)
        return 1000L // Return record count
    }
    
    override fun uploadResults(jobId: String, data: ByteArray): String {
        println("📤 Uploading results for $jobId")
        Thread.sleep(1000)
        return "s3://bucket/results/$jobId.csv"
    }
}

class NotificationServiceImpl : NotificationService {
    override fun notifyJobStarted(job: ProcessingJob) {
        println("📧 Notification: Job ${job.jobId} started")
    }
    
    override fun notifyProgress(progress: ProcessingProgress) {
        println("📧 Progress update: ${progress.percentComplete}% complete")
    }
    
    override fun notifyJobComplete(result: ProcessingResult) {
        val status = if (result.success) "✅ SUCCESS" else "❌ FAILED"
        println("📧 Notification: Job ${result.jobId} $status")
    }
}
10
Define the Workflow Interface
11
package com.example.longrunning

import io.infinitic.workflows.SendChannel

interface ProcessingWorkflow {
    // Channel for cancellation requests
    val cancelChannel: SendChannel<String>
    
    fun runProcessingJob(job: ProcessingJob): ProcessingResult
    fun runWithProgressTracking(job: ProcessingJob): ProcessingResult
}
12
Implement the Workflow
13
Build a workflow that monitors long-running tasks:
14
package com.example.longrunning

import io.infinitic.workflows.Workflow
import io.infinitic.workflows.or
import java.time.Duration

class ProcessingWorkflowImpl : Workflow(), ProcessingWorkflow {
    
    override val cancelChannel = channel<String>()
    
    private val processingService = newService(DataProcessingService::class.java)
    private val storageService = newService(StorageService::class.java)
    private val notificationService = newService(NotificationService::class.java)
    
    override fun runProcessingJob(job: ProcessingJob): ProcessingResult {
        println("\n🚀 Starting processing workflow for ${job.jobId}\n")
        
        // Notify job started
        notificationService.notifyJobStarted(job)
        
        // Validate dataset first
        val isValid = processingService.validateDataset(job.datasetUrl)
        
        if (!isValid) {
            val result = ProcessingResult(
                jobId = job.jobId,
                success = false,
                recordsProcessed = 0,
                durationSeconds = 0,
                outputUrl = null,
                errorMessage = "Invalid dataset URL"
            )
            notificationService.notifyJobComplete(result)
            return result
        }
        
        // Download dataset
        val recordCount = storageService.downloadDataset(job.datasetUrl)
        println("✓ Dataset downloaded: $recordCount records\n")
        
        // Process the dataset (long-running)
        val result = processingService.processDataset(job)
        
        // Notify completion
        notificationService.notifyJobComplete(result)
        
        return result
    }
    
    override fun runWithProgressTracking(job: ProcessingJob): ProcessingResult {
        println("\n🚀 Starting processing with progress tracking for ${job.jobId}\n")
        
        notificationService.notifyJobStarted(job)
        
        // Dispatch the processing task asynchronously
        val processingDeferred = dispatch(processingService::processDataset, job)
        
        // Monitor progress while processing
        while (!processingDeferred.isCompleted()) {
            // Wait a bit between progress checks
            val timerDeferred = timer(Duration.ofSeconds(5))
            val cancelDeferred = cancelChannel.receive()
            
            // Wait for timer or cancel signal
            when ((timerDeferred or cancelDeferred).await()) {
                is String -> {
                    // Cancellation requested
                    println("\n🛑 Cancellation requested\n")
                    processingService.cancelJob(job.jobId)
                    
                    return ProcessingResult(
                        jobId = job.jobId,
                        success = false,
                        recordsProcessed = 0,
                        durationSeconds = 0,
                        outputUrl = null,
                        errorMessage = "Cancelled by user"
                    )
                }
            }
            
            // Get and report progress
            if (!processingDeferred.isCompleted()) {
                val progress = processingService.getProgress(job.jobId)
                if (progress.status == "RUNNING") {
                    notificationService.notifyProgress(progress)
                }
            }
        }
        
        // Get final result
        val result = processingDeferred.await()
        notificationService.notifyJobComplete(result)
        
        return result
    }
}
15
Configure Infinitic
16
transport: inMemory

services:
  - name: DataProcessingService
    class: com.example.longrunning.DataProcessingServiceImpl
    concurrency: 5
    
  - name: StorageService
    class: com.example.longrunning.StorageServiceImpl
    
  - name: NotificationService
    class: com.example.longrunning.NotificationServiceImpl

workflows:
  - name: ProcessingWorkflow
    class: com.example.longrunning.ProcessingWorkflowImpl
17
Start Workers
18
package com.example.longrunning

import io.infinitic.worker.InfiniticWorker

fun main() {
    val worker = InfiniticWorker.fromConfigFile("infinitic.yml")
    worker.start()
    println("Long-running task workers started...")
}
19
Run a Processing Job
20
package com.example.longrunning

import io.infinitic.client.InfiniticClient
import java.util.UUID

fun main() {
    val client = InfiniticClient.fromConfigFile("infinitic.yml")
    
    val job = ProcessingJob(
        jobId = UUID.randomUUID().toString(),
        datasetUrl = "s3://bucket/data/input.csv",
        processingType = "TRANSFORM",
        estimatedDurationMinutes = 10
    )
    
    val workflow = client.newWorkflow(ProcessingWorkflow::class.java)
    
    println("Starting long-running processing job...\n")
    
    val result = workflow.runProcessingJob(job)
    
    if (result.success) {
        println("\n✅ Processing completed successfully")
        println("   Records processed: ${result.recordsProcessed}")
        println("   Duration: ${result.durationSeconds} seconds")
        println("   Output: ${result.outputUrl}")
    } else {
        println("\n❌ Processing failed: ${result.errorMessage}")
    }
    
    client.close()
}
21
Run with Progress Tracking
22
fun testProgressTracking() = runBlocking {
    val client = InfiniticClient.fromConfigFile("infinitic.yml")
    
    val job = ProcessingJob(
        jobId = UUID.randomUUID().toString(),
        datasetUrl = "s3://bucket/data/large-dataset.csv",
        processingType = "ANALYZE",
        estimatedDurationMinutes = 30
    )
    
    val workflow = client.newWorkflow(ProcessingWorkflow::class.java)
    val deferred = client.dispatch(workflow::runWithProgressTracking, job)
    
    println("Job dispatched with ID: ${deferred.id}\n")
    println("Progress updates will be shown...\n")
    
    val result = deferred.await()
    
    println("\n" + "=".repeat(50))
    if (result.success) {
        println("✅ Job completed successfully")
    } else {
        println("❌ Job failed: ${result.errorMessage}")
    }
    
    client.close()
}

Expected Output

🚀 Starting processing workflow for 01234567-89ab-cdef-0123-456789abcdef

📧 Notification: Job 01234567-89ab-cdef-0123-456789abcdef started
🔍 Validating dataset: s3://bucket/data/input.csv
📥 Downloading dataset from s3://bucket/data/input.csv
✓ Dataset downloaded: 1000 records

⚙️  Starting processing: 01234567-89ab-cdef-0123-456789abcdef
   Progress: 10% (100/1000 records)
📧 Progress update: 10% complete
   Progress: 20% (200/1000 records)
📧 Progress update: 20% complete
   Progress: 30% (300/1000 records)
   ...
   Progress: 100% (1000/1000 records)
📧 Notification: Job 01234567-89ab-cdef-0123-456789abcdef ✅ SUCCESS

✅ Processing completed successfully
   Records processed: 1000
   Duration: 32 seconds
   Output: s3://bucket/results/01234567-89ab-cdef-0123-456789abcdef.csv

Key Concepts

Use the @Timeout annotation to specify maximum execution time for tasks. Infinitic automatically terminates tasks that exceed their timeout and can retry them.
Inside task implementations, check Task.hasTimedOut to detect when a timeout is approaching and gracefully shut down.
The @Delegated annotation marks tasks that are handled by external systems. Infinitic tracks these tasks but doesn’t execute them directly.
Dispatch long-running tasks asynchronously, then poll for progress using a separate service method while checking isCompleted() on the deferred.
Use dispatch() for long-running tasks to avoid blocking the workflow. This allows the workflow to perform other operations while waiting.

Best Practices

  • Set Appropriate Timeouts: Configure timeouts based on expected task duration
  • Handle Timeout Gracefully: Check Task.hasTimedOut and clean up resources
  • Progress Updates: Report progress for tasks longer than a few minutes
  • Cancellation Support: Implement proper cancellation handling for user-initiated stops
  • Resource Cleanup: Always clean up resources in finally blocks
  • Retry Strategy: Configure retry policies for transient failures

Next Steps

Build docs developers (and LLMs) love