Skip to main content

Overview

The effect/unstable/workflow modules provide tools for building durable workflows that:
  • Survive process restarts
  • Handle long-running operations
  • Maintain state across interruptions
  • Coordinate distributed activities

What are workflows?

Workflows are durable execution contexts that can:
  • Persist state - Workflow state is stored and restored across restarts
  • Handle delays - Wait for hours, days, or weeks without blocking
  • Coordinate activities - Orchestrate multiple operations across services
  • Recover from failures - Automatically retry failed activities

Defining a workflow

import { Effect, Schema } from "effect"
import { Workflow } from "effect/unstable/workflow"

class OrderWorkflow extends Schema.Class<OrderWorkflow>("OrderWorkflow")({
  orderId: Schema.String,
  status: Schema.String
}) {}

const processOrderWorkflow = Workflow.make("ProcessOrder", OrderWorkflow, 
  Effect.gen(function*() {
    const workflow = yield* Workflow.Workflow
    
    // Reserve inventory
    yield* workflow.activity("reserveInventory")
    
    // Wait for payment (could be hours or days)
    const paymentReceived = yield* workflow.signal("paymentReceived")
    
    if (!paymentReceived) {
      yield* workflow.activity("releaseInventory")
      return { orderId: workflow.input.orderId, status: "cancelled" }
    }
    
    // Ship order
    yield* workflow.activity("shipOrder")
    
    return { orderId: workflow.input.orderId, status: "shipped" }
  })
)

Activities

Activities are the units of work performed by workflows:
import { Activity } from "effect/unstable/workflow"

const reserveInventory = Activity.make(
  "reserveInventory",
  Effect.gen(function*() {
    const inventory = yield* InventoryService
    yield* inventory.reserve(orderId)
  })
)

const shipOrder = Activity.make(
  "shipOrder",
  Effect.gen(function*() {
    const shipping = yield* ShippingService
    yield* shipping.createShipment(orderId)
  })
)

Signals

Signals allow external events to communicate with workflows:
// In workflow
const approved = yield* workflow.signal("approvalReceived")

if (approved) {
  yield* workflow.activity("processApprovedOrder")
}
// Send signal from outside
const client = yield* WorkflowClient
yield* client.signal(workflowId, "approvalReceived", true)

Durable delays

Workflows can wait without consuming resources:
// Wait 24 hours
yield* workflow.sleep("24 hours")

// Wait until a specific time
yield* workflow.sleepUntil(reminderDate)

Starting workflows

import { Effect } from "effect"
import { WorkflowEngine } from "effect/unstable/workflow"

const startOrder = Effect.gen(function*() {
  const engine = yield* WorkflowEngine.WorkflowEngine
  
  const workflowId = yield* engine.start(processOrderWorkflow, {
    orderId: "order-123",
    status: "pending"
  })
  
  yield* Effect.log(`Started workflow: ${workflowId}`)
  return workflowId
})

Querying workflows

const getOrderStatus = Effect.gen(function*() {
  const engine = yield* WorkflowEngine.WorkflowEngine
  
  const workflow = yield* engine.get(workflowId)
  return workflow.state.status
})

Workflow state

Workflows maintain durable state:
const workflow = Effect.gen(function*() {
  const wf = yield* Workflow.Workflow
  
  // Read state
  const currentCount = wf.state.count
  
  // Update state
  wf.state.count = currentCount + 1
  
  // State is automatically persisted
})

Error handling

Activities can be retried automatically:
const activity = Activity.make(
  "sendEmail",
  sendEmailEffect,
  {
    retries: 3,
    backoff: "exponential"
  }
)

Workflow proxy

Call workflows like regular functions:
import { WorkflowProxy } from "effect/unstable/workflow"

const proxy = yield* WorkflowProxy.make(processOrderWorkflow)

const result = yield* proxy({
  orderId: "order-123",
  status: "pending"
})

Complete example

import { Effect, Layer, Schema } from "effect"
import { Activity, Workflow, WorkflowEngine } from "effect/unstable/workflow"

class OrderInput extends Schema.Class<OrderInput>("OrderInput")({
  orderId: Schema.String,
  items: Schema.Array(Schema.String)
}) {}

class OrderResult extends Schema.Class<OrderResult>("OrderResult")({
  orderId: Schema.String,
  status: Schema.String
}) {}

// Define activities
const reserveInventory = Activity.make(
  "reserveInventory",
  (input: OrderInput) => Effect.gen(function*() {
    yield* Effect.log(`Reserving inventory for ${input.orderId}`)
    yield* Effect.sleep("1 second")
  })
)

const chargePayment = Activity.make(
  "chargePayment",
  (input: OrderInput) => Effect.gen(function*() {
    yield* Effect.log(`Charging payment for ${input.orderId}`)
    yield* Effect.sleep("1 second")
  })
)

const shipOrder = Activity.make(
  "shipOrder",
  (input: OrderInput) => Effect.gen(function*() {
    yield* Effect.log(`Shipping order ${input.orderId}`)
    yield* Effect.sleep("1 second")
  })
)

// Define workflow
const processOrder = Workflow.make(
  "ProcessOrder",
  OrderInput,
  Effect.gen(function*() {
    const workflow = yield* Workflow.Workflow
    const input = workflow.input

    yield* Effect.log(`Processing order ${input.orderId}`)

    // Execute activities sequentially
    yield* workflow.execute(reserveInventory(input))
    yield* workflow.execute(chargePayment(input))
    
    // Wait for fulfillment delay
    yield* workflow.sleep("2 seconds")
    
    yield* workflow.execute(shipOrder(input))

    return OrderResult.make({
      orderId: input.orderId,
      status: "shipped"
    })
  })
)

// Start workflow
const program = Effect.gen(function*() {
  const engine = yield* WorkflowEngine.WorkflowEngine
  
  const workflowId = yield* engine.start(processOrder, {
    orderId: "order-456",
    items: ["item1", "item2"]
  })
  
  yield* Effect.log(`Workflow started: ${workflowId}`)
  
  // Wait for completion
  const result = yield* engine.await(workflowId)
  
  yield* Effect.log(`Order ${result.orderId} ${result.status}`)
})

Use cases

  • Order processing - Handle multi-step order fulfillment
  • Approval flows - Wait for human approval in business processes
  • Scheduled tasks - Run tasks at specific times or intervals
  • Sagas - Coordinate distributed transactions
  • Event-driven processes - React to events over long periods

See also

  • Cluster - Distribute workflows across nodes
  • RPC - Define workflow activities

Build docs developers (and LLMs) love