Skip to main content
Channels allow workflows to receive external signals and events while running. This enables interactive workflows that can respond to user input, external events, or messages from other systems.

What are Channels?

Channels are communication endpoints that allow external systems to send data into a running workflow. They’re useful for:
  • Waiting for user approvals or input
  • Receiving notifications from external systems
  • Implementing interactive workflows
  • Coordinating between workflows
  • Building event-driven workflows

Creating Channels

1

Define channel in interface

Declare the channel as a property in your workflow interface:
import io.infinitic.workflows.SendChannel

interface ApprovalWorkflow {
  val approvalChannel: SendChannel<Boolean>
  fun waitForApproval(): Boolean
}
2

Create channel in implementation

Initialize the channel using the channel<T>() method:
import io.infinitic.workflows.Workflow

class ApprovalWorkflowImpl : Workflow(), ApprovalWorkflow {
  override val approvalChannel = channel<Boolean>()
  
  override fun waitForApproval(): Boolean {
    val deferred = approvalChannel.receive()
    return deferred.await()
  }
}

Basic Usage

import io.infinitic.workflows.Workflow
import io.infinitic.workflows.SendChannel

interface OrderWorkflow {
  val approvalChannel: SendChannel<String>
  fun processOrder(orderId: String): String
}

class OrderWorkflowImpl : Workflow(), OrderWorkflow {
  override val approvalChannel = channel<String>()
  
  override fun processOrder(orderId: String): String {
    // Wait for approval message
    val deferred = approvalChannel.receive()
    val approval = deferred.await()
    
    return "Order processed with: $approval"
  }
}

Receiving Signals

The receive() method returns a Deferred<T> that completes when a signal is received:
class MyWorkflow : Workflow(), MyWorkflowInterface {
  override val messageChannel = channel<String>()
  
  override fun waitForMessage(): String {
    // Returns Deferred<String>
    val deferred = messageChannel.receive()
    
    // Wait for the message
    return deferred.await()
  }
}

Sending Signals

By Workflow ID

Send signals to a specific workflow instance:
// Get workflow by ID
val workflow = client.getWorkflowById(
  OrderWorkflow::class.java,
  workflowId
)

// Send signal
workflow.approvalChannel.send("APPROVED")

By Workflow Tag

Send signals to workflows identified by tags:
// Get workflow by tag
val workflow = client.getWorkflowByTag(
  OrderWorkflow::class.java,
  "order-urgent"
)

// Send signal
workflow.notificationChannel.send("High priority order")

Filtered Signals

Using JsonPath

Filter signals based on their content using JsonPath expressions:
import io.infinitic.workflows.Workflow

data class OrderEvent(
  val orderId: String,
  val status: String,
  val amount: Double
)

class OrderWorkflow : Workflow(), OrderWorkflowInterface {
  override val eventChannel = channel<OrderEvent>()
  
  override fun waitForCompletedOrder(): OrderEvent {
    // Only receive events with status "COMPLETED"
    val deferred = eventChannel.receive(
      jsonPath = """[?(\$.status == "COMPLETED")]"""
    )
    return deferred.await()
  }
}

Using Criteria

Use Jayway JsonPath criteria for more complex filtering:
import com.jayway.jsonpath.Criteria.where

data class UserEvent(
  val userId: String,
  val action: String,
  val value: Int
)

class EventWorkflow : Workflow(), EventWorkflowInterface {
  override val eventChannel = channel<UserEvent>()
  
  override fun waitForHighValueAction(): UserEvent {
    // Receive events where action is "purchase" and value > 100
    val deferred = eventChannel.receive(
      jsonPath = "[?]",
      criteria = where("action").eq("purchase").and("value").gt(100)
    )
    return deferred.await()
  }
}

Type-Specific Channels

Receiving Specific Types

Receive only signals of a specific type from a polymorphic channel:
interface Event
data class OrderEvent(val orderId: String) : Event
data class PaymentEvent(val paymentId: String) : Event

class EventWorkflow : Workflow(), EventWorkflowInterface {
  override val eventChannel = channel<Event>()
  
  override fun waitForOrder(): OrderEvent {
    // Only receive OrderEvent types
    val deferred = eventChannel.receive(OrderEvent::class.java)
    return deferred.await()
  }
  
  override fun waitForPayment(): PaymentEvent {
    // Only receive PaymentEvent types
    val deferred = eventChannel.receive(PaymentEvent::class.java)
    return deferred.await()
  }
}

Combining Type Filtering with JsonPath

class EventWorkflow : Workflow(), EventWorkflowInterface {
  override val eventChannel = channel<Event>()
  
  override fun waitForSpecificOrder(orderId: String): OrderEvent {
    // Receive OrderEvent with specific orderId
    val deferred = eventChannel.receive(
      OrderEvent::class.java,
      jsonPath = """[?(\$.orderId == "$orderId")]"""
    )
    return deferred.await()
  }
}

Multiple Signals

Receive multiple signals on the same channel by specifying a limit:
class BatchWorkflow : Workflow(), BatchWorkflowInterface {
  override val messageChannel = channel<String>()
  
  override fun collectMessages(count: Int): String {
    // Receive up to 'count' messages
    val deferred = messageChannel.receive(limit = count)
    
    var result = ""
    repeat(count) {
      result += deferred.await()
    }
    return result
  }
}
val workflow = client.getWorkflowById(
  BatchWorkflow::class.java,
  workflowId
)

// Send multiple signals
workflow.messageChannel.send("a")
workflow.messageChannel.send("b")
workflow.messageChannel.send("c")

// Result: "abc"

Racing Channels with Timers

Use or() to wait for either a channel signal or a timer:
import io.infinitic.workflows.or
import java.time.Duration

class TimeoutWorkflow : Workflow(), TimeoutWorkflowInterface {
  override val approvalChannel = channel<String>()
  
  override fun waitForApprovalWithTimeout(): String {
    val signalDeferred = approvalChannel.receive()
    val timeoutDeferred = timer(Duration.ofMinutes(5))
    
    return when (val result = (signalDeferred or timeoutDeferred).await()) {
      is String -> "Approved: $result"
      else -> "Timeout - no approval received"
    }
  }
}

Multiple Channels

Workflows can have multiple channels for different types of signals:
interface OrderWorkflow {
  val approvalChannel: SendChannel<Boolean>
  val cancellationChannel: SendChannel<String>
  val updateChannel: SendChannel<OrderUpdate>
  
  fun processOrder(orderId: String): OrderResult
}

class OrderWorkflowImpl : Workflow(), OrderWorkflow {
  override val approvalChannel = channel<Boolean>()
  override val cancellationChannel = channel<String>()
  override val updateChannel = channel<OrderUpdate>()
  
  override fun processOrder(orderId: String): OrderResult {
    // Wait for approval
    val approvalDeferred = approvalChannel.receive()
    val cancelDeferred = cancellationChannel.receive()
    
    // Race approval vs cancellation
    return when ((approvalDeferred or cancelDeferred).await()) {
      is Boolean -> processApprovedOrder(orderId)
      is String -> cancelOrder(orderId)
      else -> throw IllegalStateException()
    }
  }
}

Checking Channel Status

Check if a signal has been received without waiting:
class StatusWorkflow : Workflow(), StatusWorkflowInterface {
  override val statusChannel = channel<String>()
  
  override fun checkStatus(): String {
    val deferred = statusChannel.receive()
    
    return if (deferred.isCompleted()) {
      "Signal received: ${deferred.await()}"
    } else {
      "No signal received yet"
    }
  }
}

Channel Signal Ordering

Signals sent to a channel are processed in order:
class SequenceWorkflow : Workflow(), SequenceWorkflowInterface {
  override val eventChannel = channel<String>()
  
  override fun processSequence(): List<String> {
    val results = mutableListOf<String>()
    
    // Receive three signals in order
    repeat(3) {
      val deferred = eventChannel.receive()
      results.add(deferred.await())
    }
    
    return results
  }
}
Signal TimingSignals sent before a workflow starts waiting for them will be discarded. Always ensure the workflow is waiting on a channel before sending signals:
// ❌ Wrong - signal sent before workflow waits
workflow.channel.send("message")
val result = workflow.waitForSignal()  // Will timeout

// ✅ Correct - workflow waits first
val deferred = client.dispatch(workflow::waitForSignal)
Thread.sleep(100)  // Give workflow time to start
workflow.channel.send("message")
val result = deferred.await()

Complete Example

Here’s a complete example of an approval workflow with timeout and cancellation:
import io.infinitic.workflows.Workflow
import io.infinitic.workflows.SendChannel
import io.infinitic.workflows.or
import java.time.Duration

interface ApprovalWorkflow {
  val approvalChannel: SendChannel<Boolean>
  val cancelChannel: SendChannel<String>
  
  fun requestApproval(requestId: String): ApprovalResult
}

class ApprovalWorkflowImpl : Workflow(), ApprovalWorkflow {
  override val approvalChannel = channel<Boolean>()
  override val cancelChannel = channel<String>()
  
  private val notificationService = newService(NotificationService::class.java)
  
  override fun requestApproval(requestId: String): ApprovalResult {
    // Send notification
    notificationService.sendApprovalRequest(requestId)
    
    // Wait for approval, cancellation, or timeout
    val approvalDeferred = approvalChannel.receive()
    val cancelDeferred = cancelChannel.receive()
    val timeoutDeferred = timer(Duration.ofHours(24))
    
    return when (val result = (approvalDeferred or cancelDeferred or timeoutDeferred).await()) {
      is Boolean -> {
        if (result) {
          ApprovalResult.approved(requestId)
        } else {
          ApprovalResult.rejected(requestId)
        }
      }
      is String -> ApprovalResult.canceled(requestId, result)
      else -> ApprovalResult.timedOut(requestId)
    }
  }
}

Best Practices

Name channels clearly to indicate their purpose:
// ✅ Good
val approvalChannel = channel<Boolean>()
val cancellationChannel = channel<String>()
val statusUpdateChannel = channel<Status>()

// ❌ Bad
val channel1 = channel<Boolean>()
val ch = channel<String>()
Don’t wait indefinitely for signals:
// ✅ Good - with timeout
val signalDeferred = channel.receive()
val timeout = timer(Duration.ofMinutes(10))
val result = (signalDeferred or timeout).await()

// ❌ Bad - no timeout
val result = channel.receive().await()
Filter by type when receiving different event types:
// For polymorphic events
val eventChannel = channel<Event>()

// Receive specific types
val orderEvent = eventChannel.receive(OrderEvent::class.java).await()
val paymentEvent = eventChannel.receive(PaymentEvent::class.java).await()
Always validate received signals:
val approval = approvalChannel.receive().await()
require(approval in listOf(true, false)) {
  "Invalid approval value"
}

Next Steps

Timers

Learn about timers and delays

Sub-workflows

Compose workflows from other workflows

Versioning

Update workflows while maintaining compatibility

Workflow Methods

Complete workflow method reference

Build docs developers (and LLMs) love