Skip to main content

What is Batching?

Batching allows you to process multiple task executions together in a single operation. Instead of executing tasks one at a time, Infinitic can group multiple pending tasks and execute them as a batch. This is invaluable for:
  • Database operations (bulk inserts, batch queries)
  • External API calls (reducing API call count)
  • Network operations (minimizing round trips)
  • Resource-intensive operations (amortizing setup costs)

The @Batch Annotation

Location in source code: io.infinitic.annotations.Batch
import io.infinitic.annotations.Batch

@Target(AnnotationTarget.FUNCTION)
annotation class Batch
The @Batch annotation marks a method as a batch implementation. When applied, Infinitic will:
  1. Collect multiple pending task executions
  2. Group them according to batching configuration
  3. Execute the batch method once with all tasks
  4. Distribute results back to individual tasks

Basic Usage

To use batching, define both a regular method and a batch implementation:
import io.infinitic.annotations.Batch

interface EmailService {
  fun sendEmail(to: String, subject: String, body: String): Boolean
}

class EmailServiceImpl : EmailService {
  
  // Regular method signature (required by interface)
  override fun sendEmail(to: String, subject: String, body: String): Boolean {
    // This won't be called when batching is used
    throw NotImplementedError("Use batch method")
  }
  
  // Batch implementation
  @Batch
  fun sendEmail(batch: Map<String, EmailData>): Map<String, Boolean> {
    // batch is a map of taskId -> input parameters
    // Returns a map of taskId -> result
    
    val emails = batch.values.map { data ->
      Email(to = data.to, subject = data.subject, body = data.body)
    }
    
    // Send all emails in one API call
    val results = emailProvider.sendBulk(emails)
    
    // Map results back to task IDs
    return batch.keys.zip(results).toMap()
  }
  
  data class EmailData(val to: String, val subject: String, val body: String)
}

Batch Method Signature

Batch methods have specific signature requirements:

Input: Map of Task IDs to Parameters

The batch method receives a Map<String, T> where:
  • Key: Task ID (String)
  • Value: Input parameters for that task
// Single parameter
@Batch
fun processItem(batch: Map<String, Item>): Map<String, Result>

// Multiple parameters wrapped in a data class
@Batch
fun processData(batch: Map<String, ProcessInput>): Map<String, ProcessResult>

data class ProcessInput(val data: String, val config: Config)

Output: Map of Task IDs to Results

The batch method must return a Map<String, R> where:
  • Key: Task ID (String) - must match input keys
  • Value: Result for that task
@Batch
fun process(batch: Map<String, Input>): Map<String, Result> {
  return batch.mapValues { (taskId, input) ->
    // Process each input
    processOne(input)
  }
}

Void Return Type

For methods that return void/Unit, the batch method returns Map<String, Unit>:
interface NotificationService {
  fun sendNotification(userId: String, message: String)
}

class NotificationServiceImpl : NotificationService {
  override fun sendNotification(userId: String, message: String) {
    throw NotImplementedError()
  }
  
  @Batch
  fun sendNotification(batch: Map<String, NotificationData>): Map<String, Unit> {
    // Send all notifications
    notificationSystem.sendBulk(batch.values)
    
    // Return Unit for each task
    return batch.mapValues { Unit }
  }
  
  data class NotificationData(val userId: String, val message: String)
}

Complete Example

Here’s a complete example from the Infinitic test suite: Location in source code: /home/daytona/workspace/source/infinitic-tests/src/test/kotlin/io/infinitic/tests/batches/BatchService.kt:42-95
import io.infinitic.annotations.Batch
import io.infinitic.tasks.Task

interface BatchService {
  fun processItem(item: Int): Int
  fun processData(foo: Int, bar: Int): Int
  fun haveSameKey(i: Int): Boolean
}

class BatchServiceImpl : BatchService {
  
  // Regular methods throw NotImplementedError
  override fun processItem(item: Int) = throw NotImplementedError()
  override fun processData(foo: Int, bar: Int) = throw NotImplementedError()
  override fun haveSameKey(i: Int) = throw NotImplementedError()
  
  // Batch implementation for processItem
  @Batch
  fun processItem(batch: Map<String, Int>): Map<String, Int> {
    val batchSize = batch.values.count()
    // Return the batch size for each item
    return batch.mapValues { batchSize }
  }
  
  // Batch implementation with multiple parameters
  @Batch
  fun processData(batch: Map<String, ProcessInput>): Map<String, Int> {
    val batchSize = batch.values.count()
    return batch.mapValues { (_, input) ->
      input.bar + batchSize
    }
  }
  
  // Batch implementation accessing task context
  @Batch
  fun haveSameKey(batch: Map<String, Int>): Map<String, Boolean> {
    // Get batch key for each task
    val batchKeys = batch.keys.map { taskId ->
      Task.getContext(taskId)?.batchKey
    }
    
    val firstKey = batchKeys.first()
    val allHaveSameKey = batchKeys.all { it == firstKey }
    
    return batch.mapValues { allHaveSameKey }
  }
  
  data class ProcessInput(val foo: Int, val bar: Int)
}

Batch Configuration

Configure batching behavior in your worker configuration:
import io.infinitic.workers.InfiniticWorker

val worker = InfiniticWorker.builder()
  .setTransport(transport)
  .addService(
    name = "emailService",
    factory = { EmailServiceImpl() },
    concurrency = 10,
    batch = BatchConfig(
      maxMessages = 100,        // Max tasks per batch
      maxSeconds = 1.0,         // Max wait time in seconds
    )
  )
  .build()

Configuration Parameters

  • maxMessages: Maximum number of tasks to include in a batch (default: unlimited)
  • maxSeconds: Maximum time to wait before processing a batch (default: no limit)
Infinitic will process a batch when either:
  • The batch reaches maxMessages tasks, OR
  • maxSeconds have elapsed since the first task was received

Accessing Task Context in Batches

Within batch methods, access individual task contexts using Task.getContext(taskId):
import io.infinitic.annotations.Batch
import io.infinitic.tasks.Task

class DataServiceImpl : DataService {
  
  @Batch
  fun processData(batch: Map<String, Data>): Map<String, Result> {
    return batch.mapValues { (taskId, data) ->
      // Get context for this specific task
      val context = Task.getContext(taskId)
      
      if (context != null) {
        println("Task ID: ${context.taskId}")
        println("Workflow ID: ${context.workflowId}")
        println("Retry sequence: ${context.retrySequence}")
        println("Batch key: ${context.batchKey}")
        println("Tags: ${context.tags}")
      }
      
      // Process the data
      processDataItem(data)
    }
  }
}

Batch Keys

Batch keys allow you to group tasks with related data together:
import io.infinitic.annotations.Batch
import io.infinitic.tasks.Task

class DatabaseServiceImpl : DatabaseService {
  
  @Batch
  fun saveRecord(batch: Map<String, Record>): Map<String, Boolean> {
    // Group records by batch key (e.g., database shard)
    val groupedByKey = batch.entries.groupBy { (taskId, _) ->
      Task.getContext(taskId)?.batchKey
    }
    
    // Process each group with the appropriate shard
    val results = mutableMapOf<String, Boolean>()
    
    groupedByKey.forEach { (shardKey, entries) ->
      val shard = getShardForKey(shardKey)
      val shardResults = shard.bulkInsert(entries.map { it.value })
      
      entries.zip(shardResults).forEach { (entry, result) ->
        results[entry.key] = result
      }
    }
    
    return results
  }
}
Set batch keys when dispatching tasks:
val service = client.newService(
  DatabaseService::class.java,
  batchKey = "shard-1"
)

service.saveRecord(record)

Use Cases

Database Bulk Operations

import io.infinitic.annotations.Batch

interface UserRepository {
  fun saveUser(user: User): Long
}

class UserRepositoryImpl(private val db: Database) : UserRepository {
  
  override fun saveUser(user: User): Long {
    throw NotImplementedError()
  }
  
  @Batch
  fun saveUser(batch: Map<String, User>): Map<String, Long> {
    // Perform bulk insert
    val users = batch.values.toList()
    val generatedIds = db.bulkInsert(users)
    
    // Map IDs back to task IDs
    return batch.keys.zip(generatedIds).toMap()
  }
}

External API Optimization

import io.infinitic.annotations.Batch

interface GeocodingService {
  fun geocodeAddress(address: String): Coordinates
}

class GeocodingServiceImpl(
  private val geocodingAPI: GeocodingAPI
) : GeocodingService {
  
  override fun geocodeAddress(address: String): Coordinates {
    throw NotImplementedError()
  }
  
  @Batch
  fun geocodeAddress(batch: Map<String, String>): Map<String, Coordinates> {
    // Use batch API endpoint (1 call instead of N)
    val addresses = batch.values.toList()
    val results = geocodingAPI.batchGeocode(addresses)
    
    return batch.keys.zip(results).toMap()
  }
}

Data Enrichment

import io.infinitic.annotations.Batch

interface UserEnrichmentService {
  fun enrichUser(userId: String): EnrichedUser
}

class UserEnrichmentServiceImpl : UserEnrichmentService {
  
  override fun enrichUser(userId: String): EnrichedUser {
    throw NotImplementedError()
  }
  
  @Batch
  fun enrichUser(batch: Map<String, String>): Map<String, EnrichedUser> {
    val userIds = batch.values.toList()
    
    // Fetch all data in parallel
    val profiles = profileService.getBatch(userIds)
    val preferences = preferenceService.getBatch(userIds)
    val activities = activityService.getBatch(userIds)
    
    // Combine all data
    return batch.mapValues { (_, userId) ->
      EnrichedUser(
        profile = profiles[userId]!!,
        preferences = preferences[userId]!!,
        recentActivity = activities[userId] ?: emptyList()
      )
    }
  }
}

ML Model Inference

import io.infinitic.annotations.Batch

interface PredictionService {
  fun predict(features: Features): Prediction
}

class PredictionServiceImpl(
  private val model: MLModel
) : PredictionService {
  
  override fun predict(features: Features): Prediction {
    throw NotImplementedError()
  }
  
  @Batch
  fun predict(batch: Map<String, Features>): Map<String, Prediction> {
    // Batch inference is much faster than individual predictions
    val featureMatrix = batch.values.toMatrix()
    val predictions = model.predictBatch(featureMatrix)
    
    return batch.keys.zip(predictions).toMap()
  }
}

Error Handling

Handle errors for individual tasks within a batch:
import io.infinitic.annotations.Batch

@Batch
fun processItems(batch: Map<String, Item>): Map<String, Result> {
  return batch.mapValues { (taskId, item) ->
    try {
      processItem(item)
    } catch (e: ValidationException) {
      // Return error result for this specific task
      Result.error(e.message)
    } catch (e: Exception) {
      // Throw to retry the entire batch
      throw e
    }
  }
}

Best Practices

Choose Appropriate Batch Sizes

Balance between latency and throughput:
// For high-throughput scenarios
batch = BatchConfig(
  maxMessages = 1000,  // Large batches
  maxSeconds = 5.0     // Can wait longer
)

// For low-latency scenarios
batch = BatchConfig(
  maxMessages = 10,    // Small batches
  maxSeconds = 0.1     // Process quickly
)

Maintain Task Independence

Each task in a batch should be independent:
// ✅ Good: Independent tasks
@Batch
fun processUser(batch: Map<String, User>): Map<String, Result> {
  return batch.mapValues { (_, user) ->
    processIndependently(user)
  }
}

// ❌ Bad: Tasks depend on each other
@Batch
fun processSequence(batch: Map<String, Item>): Map<String, Result> {
  var state = initialState
  return batch.mapValues { (_, item) ->
    // This creates dependencies between tasks!
    state = processWithState(item, state)
    Result(state)
  }
}

Handle Partial Failures Gracefully

Don’t let one task failure affect the entire batch:
@Batch
fun sendEmails(batch: Map<String, EmailData>): Map<String, Boolean> {
  return batch.mapValues { (taskId, email) ->
    try {
      emailProvider.send(email)
      true
    } catch (e: Exception) {
      // Log but don't fail the entire batch
      logger.error("Failed to send email for task $taskId", e)
      false
    }
  }
}

Monitor Batch Performance

@Batch
fun process(batch: Map<String, Data>): Map<String, Result> {
  val startTime = System.currentTimeMillis()
  val batchSize = batch.size
  
  try {
    val results = batch.mapValues { (_, data) -> processData(data) }
    
    val duration = System.currentTimeMillis() - startTime
    metrics.recordBatch(
      size = batchSize,
      durationMs = duration,
      success = true
    )
    
    return results
  } catch (e: Exception) {
    val duration = System.currentTimeMillis() - startTime
    metrics.recordBatch(
      size = batchSize,
      durationMs = duration,
      success = false
    )
    throw e
  }
}

Performance Considerations

Batching can provide significant performance improvements:
  • Database operations: 10-100x faster for bulk inserts
  • API calls: Reduce from N calls to 1 call
  • Network overhead: Minimize connection setup costs
  • Resource utilization: Better CPU and memory usage
Example performance comparison:
// Without batching: 1000 individual inserts
// ~10 seconds (10ms per insert)

// With batching: 1 bulk insert of 1000 records  
// ~100ms (0.1ms per record)

// 100x performance improvement!

Next Steps

Service Context

Learn more about accessing task information in services

Defining Services

Go back to service basics

Build docs developers (and LLMs) love