Skip to main content

The Task Object

Every service method execution has access to contextual information through the Task object. This provides metadata about the current task, the workflow that triggered it (if any), and access to the Infinitic client. Location in source code: io.infinitic.tasks.Task
import io.infinitic.tasks.Task

class MyServiceImpl : MyService {
  override fun processData(data: String): String {
    val taskId = Task.taskId
    val workflowId = Task.workflowId
    
    println("Processing task $taskId from workflow $workflowId")
    
    return "processed: $data"
  }
}

Task Properties

The Task object provides the following properties:

Basic Task Information

Task.taskId

The unique identifier for the current task.
val taskId: String = Task.taskId

Task.taskName

The name of the task method being executed.
val taskName: String = Task.taskName
// Example: "processData" or custom name from @Name annotation

Task.serviceName

The name of the service this task belongs to.
val serviceName: String = Task.serviceName
// Example: "MyService" or custom name from @Name annotation

Task.workerName

The name of the worker executing this task.
val workerName: String = Task.workerName

Workflow Information

These properties are available when the task is called from a workflow:

Task.workflowId

The unique identifier of the workflow that triggered this task (null if not called from a workflow).
val workflowId: String? = Task.workflowId

if (workflowId != null) {
  println("Called from workflow: $workflowId")
} else {
  println("Called directly from client")
}

Task.workflowName

The name of the workflow that triggered this task.
val workflowName: String? = Task.workflowName

Task.workflowVersion

The version of the workflow that triggered this task.
val workflowVersion: Int? = Task.workflowVersion

Retry Information

Task.retrySequence

The sequence number of the current retry attempt (0 for first execution).
val retrySequence: Int = Task.retrySequence

if (retrySequence > 0) {
  println("This is retry attempt #$retrySequence")
}

Task.retryIndex

The index of the current retry attempt.
val retryIndex: Int = Task.retryIndex

Task.lastError

Information about the error from the previous attempt (null on first attempt).
import io.infinitic.tasks.TaskFailure

val lastError: TaskFailure? = Task.lastError

if (lastError != null) {
  println("Previous attempt failed with: ${lastError.message}")
}

Tags and Metadata

Task.tags

A set of tags associated with this task.
val tags: Set<String> = Task.tags

tags.forEach { tag ->
  println("Task has tag: $tag")
}

Task.meta

A mutable map of metadata associated with the task. You can read and write to this metadata.
val meta: MutableMap<String, ByteArray> = Task.meta

// Read metadata
val userId = meta["userId"]?.let { String(it) }

// Write metadata (persisted for this task)
meta["processedAt"] = System.currentTimeMillis().toString().toByteArray()

Batch Information

Task.batchKey

The batch key for this task when using batch processing (null if not batched).
val batchKey: String? = Task.batchKey
Useful in batch methods to understand task grouping:
import io.infinitic.annotations.Batch

@Batch
fun processItems(items: Map<String, Item>): Map<String, Result> {
  // Get batch key for each task
  items.forEach { (taskId, item) ->
    val batchKey = Task.getContext(taskId)?.batchKey
    println("Task $taskId has batch key: $batchKey")
  }
  
  return items.mapValues { (_, item) -> process(item) }
}

Configuration Information

Task.withRetry

The retry configuration for this task.
import io.infinitic.tasks.WithRetry

val withRetry: WithRetry? = Task.withRetry

val secondsBeforeRetry = withRetry?.getSecondsBeforeRetry(0, Exception())

Task.withTimeout

The timeout configuration for this task.
import io.infinitic.tasks.WithTimeout

val withTimeout: WithTimeout? = Task.withTimeout

val timeoutSeconds = withTimeout?.getTimeoutSeconds()

The Task Context

The full TaskContext interface provides all contextual information:
import io.infinitic.tasks.TaskContext

val context: TaskContext? = Task.context

if (context != null) {
  println("Service: ${context.serviceName}")
  println("Task: ${context.taskId}")
  println("Worker: ${context.workerName}")
}

Accessing the Infinitic Client

The Task.client property provides access to the Infinitic client, allowing services to:
  • Dispatch other tasks
  • Start workflows
  • Interact with running workflows
import io.infinitic.tasks.Task

class OrderServiceImpl : OrderService {
  override fun processOrder(orderId: String): OrderResult {
    // Dispatch another task
    val notificationTask = Task.client.newService(NotificationService::class.java)
    notificationTask.sendEmail(
      to = "[email protected]",
      subject = "Order Confirmation",
      body = "Your order $orderId has been processed"
    )
    
    // Start a workflow
    val fulfillmentWorkflow = Task.client.newWorkflow(FulfillmentWorkflow::class.java)
    Task.client.dispatch(fulfillmentWorkflow::fulfill, orderId)
    
    return OrderResult(orderId, "processed")
  }
}

Timeout Handling

Task.hasTimedOut

Check if the current task execution has timed out.
val hasTimedOut: Boolean = Task.hasTimedOut

while (!Task.hasTimedOut) {
  // Continue processing
  processNextItem()
}

Task.onTimeOut

Register a callback to be executed when the task times out.
Task.onTimeOut {
  println("Task is timing out, cleaning up resources...")
  cleanup()
}

// Continue with long-running operation
while (!Task.hasTimedOut) {
  processItem()
}
Complete example with timeout handling:
import io.infinitic.annotations.Timeout
import io.infinitic.tasks.Task
import io.infinitic.tasks.WithTimeout

class TenSecondTimeout : WithTimeout {
  override fun getTimeoutSeconds(): Double = 10.0
}

class DataProcessingServiceImpl : DataProcessingService {
  
  @Timeout(TenSecondTimeout::class)
  override fun processLargeDataset(data: List<Item>): Result {
    val processed = mutableListOf<Item>()
    
    // Register cleanup on timeout
    Task.onTimeOut {
      println("Timeout! Processed ${processed.size} items before timeout")
      saveProgress(processed)
    }
    
    // Process items until timeout
    for (item in data) {
      if (Task.hasTimedOut) {
        break
      }
      processed.add(processItem(item))
    }
    
    return Result(processed)
  }
}

Batch Context Access

In batch methods, use Task.getContext(taskId) to access the context for individual tasks:
import io.infinitic.annotations.Batch
import io.infinitic.tasks.Task

class EmailServiceImpl : EmailService {
  
  @Batch
  fun sendEmail(emails: Map<String, EmailData>): Map<String, Boolean> {
    return emails.mapValues { (taskId, emailData) ->
      // Get context for this specific task
      val context = Task.getContext(taskId)
      
      if (context != null) {
        println("Processing task ${context.taskId}")
        println("From workflow: ${context.workflowId}")
        println("Retry attempt: ${context.retrySequence}")
      }
      
      // Process the email
      sendEmailImpl(emailData)
    }
  }
}

Practical Examples

Using Task Context for Logging

class PaymentServiceImpl : PaymentService {
  private val logger = LoggerFactory.getLogger(this::class.java)
  
  override fun processPayment(amount: Double, currency: String): PaymentResult {
    logger.info(
      "Processing payment - Task: ${Task.taskId}, " +
      "Workflow: ${Task.workflowId ?: "none"}, " +
      "Retry: ${Task.retrySequence}"
    )
    
    return processPaymentImpl(amount, currency)
  }
}

Conditional Logic Based on Retry Attempt

class ExternalApiServiceImpl : ExternalApiService {
  override fun callExternalAPI(endpoint: String): ApiResponse {
    // Use different timeout based on retry attempt
    val timeout = when (Task.retrySequence) {
      0 -> 5000L  // First attempt: 5 seconds
      1 -> 10000L // Second attempt: 10 seconds
      else -> 30000L // Further attempts: 30 seconds
    }
    
    return callWithTimeout(endpoint, timeout)
  }
}

Using Metadata for Tracking

class DataProcessingServiceImpl : DataProcessingService {
  override fun processData(data: String): Result {
    // Record start time in metadata
    Task.meta["startTime"] = System.currentTimeMillis().toString().toByteArray()
    
    // Process data
    val result = process(data)
    
    // Record completion time
    Task.meta["endTime"] = System.currentTimeMillis().toString().toByteArray()
    
    return result
  }
}

Starting Child Workflows

class OrchestrationServiceImpl : OrchestrationService {
  override fun orchestrateProcess(processId: String): String {
    // Start multiple child workflows
    val workflow1 = Task.client.newWorkflow(DataProcessingWorkflow::class.java, tags = setOf(processId))
    Task.client.dispatch(workflow1::process, processId)
    
    val workflow2 = Task.client.newWorkflow(NotificationWorkflow::class.java, tags = setOf(processId))
    Task.client.dispatch(workflow2::notifyUsers, processId)
    
    return "Orchestrated: $processId"
  }
}

Testing with Task Context

When writing tests, you can mock the task context:
import io.infinitic.tasks.Task
import org.junit.jupiter.api.Test

class MyServiceTest {
  
  @Test
  fun `test service with mocked context`() {
    // Create a mock context
    val mockContext = createMockTaskContext(
      taskId = "test-task-123",
      serviceName = "MyService",
      retrySequence = 0
    )
    
    // Set the context for the current thread
    Task.setContext(mockContext)
    
    // Test your service
    val service = MyServiceImpl()
    val result = service.processData("test")
    
    // Assertions
    assertEquals("processed: test", result)
  }
}

Next Steps

Delegated Tasks

Integrate with external systems for long-running operations

Batching

Process multiple tasks efficiently with batch operations

Build docs developers (and LLMs) love