Skip to main content
The ADK workflow system enables building long-running, suspendable workflows that can pause execution, wait for external events, and resume from where they left off. This is different from LangGraphAgent, which is for in-memory agent orchestration.

Overview

Workflows provide:
  • Step-by-step execution - Break complex processes into discrete steps
  • Suspend/resume - Pause workflows to wait for external input or events
  • Snapshot persistence - Save and restore workflow state across restarts
  • Type safety - Fully typed workflow inputs, outputs, and step results
  • Error handling - Graceful failure handling with detailed error tracking

Quick Start

import { createWorkflow, createStep } from "@iqai/adk";

// Define workflow steps
const fetchData = createStep({
  id: "fetch-data",
  description: "Fetch user data from API",
  execute: async (ctx) => {
    const userId = ctx.inputData.userId;
    const response = await fetch(`/api/users/${userId}`);
    return response.json();
  },
});

const processData = createStep({
  id: "process-data",
  description: "Process user data",
  execute: async (ctx) => {
    const userData = ctx.getStepResult("fetch-data");
    return {
      name: userData.name,
      email: userData.email,
      processed: true,
    };
  },
});

// Create workflow
const workflow = createWorkflow({
  id: "user-processing",
  description: "Fetch and process user data",
})
  .step(fetchData)
  .step(processData)
  .commit();

// Execute workflow
const run = workflow.createRun();
const result = await run.start({ userId: "123" });

if (result.status === "success") {
  console.log("Result:", result.result);
}

Core Concepts

Workflow

A workflow is a sequence of steps that execute in order:
const workflow = createWorkflow({
  id: "my-workflow",
  description: "Description of what this workflow does",
  inputSchema: z.object({ userId: z.string() }), // Optional: Zod schema
  outputSchema: z.object({ result: z.string() }), // Optional: Zod schema
});

Step

A step is a single unit of work in a workflow:
const myStep = createStep({
  id: "my-step",
  description: "What this step does",
  execute: async (ctx) => {
    // Access workflow input
    const input = ctx.inputData;
    
    // Access previous step results
    const prevResult = ctx.getStepResult("previous-step-id");
    
    // Do work
    const result = await doSomething(input, prevResult);
    
    return result;
  },
});

Run

A run is a single execution instance of a workflow:
const run = workflow.createRun();
const result = await run.start({ userId: "123" });

Suspending and Resuming

The key feature of workflows is the ability to suspend execution:
const waitForApproval = createStep({
  id: "wait-approval",
  execute: async (ctx) => {
    // Suspend workflow and wait for external approval
    ctx.suspend({ 
      approvalRequestId: "req-123",
      message: "Waiting for manager approval"
    });
    
    // This code never runs - suspend throws
  },
});

const processApproval = createStep({
  id: "process-approval",
  execute: async (ctx) => {
    // Access resume data
    const approval = ctx.resumeData;
    
    if (approval.approved) {
      return { status: "approved", by: approval.approvedBy };
    } else {
      throw new Error("Request denied");
    }
  },
});

const workflow = createWorkflow({ id: "approval-workflow" })
  .step(waitForApproval)
  .step(processApproval)
  .commit();

// Start workflow
const run = workflow.createRun();
const result = await run.start({ requestId: "123" });

if (result.status === "suspended") {
  console.log("Workflow suspended at:", result.suspendedStep);
  console.log("Suspend data:", result.suspendPayload);
  
  // Later, when approval comes in...
  const finalResult = await run.resume({
    resumeData: { approved: true, approvedBy: "[email protected]" },
  });
  
  console.log("Final result:", finalResult);
}

Snapshot Persistence

Persist workflow state to resume across restarts:
import { 
  createWorkflow, 
  InMemorySnapshotStore,
  type SnapshotStore 
} from "@iqai/adk";

// Use in-memory store (for development)
const snapshotStore = new InMemorySnapshotStore();

// Or implement custom store (for production)
class DatabaseSnapshotStore implements SnapshotStore {
  async save(snapshot: WorkflowSnapshot): Promise<void> {
    await db.workflows.upsert({
      where: { runId: snapshot.runId },
      update: snapshot,
      create: snapshot,
    });
  }
  
  async load(
    workflowId: string, 
    runId: string
  ): Promise<WorkflowSnapshot | null> {
    return db.workflows.findUnique({ 
      where: { workflowId, runId } 
    });
  }
  
  async delete(workflowId: string, runId: string): Promise<void> {
    await db.workflows.delete({ where: { workflowId, runId } });
  }
  
  async list(workflowId: string): Promise<WorkflowSnapshot[]> {
    return db.workflows.findMany({ where: { workflowId } });
  }
}

const workflow = createWorkflow({ id: "my-workflow" })
  .step(step1)
  .step(step2)
  .withSnapshotStore(snapshotStore)
  .commit();

Resuming from Snapshots

// Start a workflow
const run = workflow.createRun("specific-run-id");
const result = await run.start(input);

if (result.status === "suspended") {
  // Workflow suspended - snapshot is automatically saved
}

// Later (even after server restart)...
const resumedRun = await workflow.resumeRun("specific-run-id");

if (resumedRun) {
  const finalResult = await resumedRun.resume({
    resumeData: externalData,
  });
}

// List all suspended runs
const suspendedRuns = await workflow.listRuns();
for (const snapshot of suspendedRuns) {
  console.log("Suspended run:", snapshot.runId, snapshot.suspendedStepId);
}

Execute Context

The ExecuteContext is passed to every step’s execute function:
interface ExecuteContext<TInput, TResume, TSuspend> {
  /** Unique ID for this workflow run */
  runId: string;
  
  /** ID of the workflow */
  workflowId: string;
  
  /** Input data passed to workflow.start() */
  inputData: TInput;
  
  /** Resume data (only set on resumed steps) */
  resumeData?: TResume;
  
  /** Suspend the workflow with optional payload */
  suspend: (payload?: TSuspend) => never;
  
  /** Get the result of a previous step */
  getStepResult: <T>(stepId: string) => T | undefined;
}

Step Results

Each step produces a result that is tracked:
type StepResult<TInput, TOutput, TResume, TSuspend> = 
  | StepSuccess<TInput, TOutput>
  | StepSuspended<TInput, TSuspend>
  | StepFailed<TInput>;

interface StepSuccess<TInput, TOutput> {
  status: "success";
  output: TOutput;
  payload: TInput;
  startedAt: number;
  endedAt: number;
}

interface StepSuspended<TInput, TSuspend> {
  status: "suspended";
  payload: TInput;
  suspendPayload?: TSuspend;
  startedAt: number;
  suspendedAt: number;
}

interface StepFailed<TInput> {
  status: "failed";
  error: Error;
  payload: TInput;
  startedAt: number;
  endedAt: number;
}
Access step results in the final workflow result:
const result = await run.start(input);

if (result.status === "success") {
  console.log("Step results:", result.steps);
  // result.steps["step-id"] contains the step result
}

Advanced Examples

Multi-step Approval Workflow

const submitRequest = createStep({
  id: "submit",
  execute: async (ctx) => {
    const request = await api.submitRequest(ctx.inputData);
    return { requestId: request.id, status: "pending" };
  },
});

const waitManagerApproval = createStep({
  id: "wait-manager",
  execute: async (ctx) => {
    const { requestId } = ctx.getStepResult("submit");
    
    // Suspend and wait for manager
    ctx.suspend({ 
      type: "manager-approval",
      requestId 
    });
  },
});

const checkManagerApproval = createStep({
  id: "check-manager",
  execute: async (ctx) => {
    if (!ctx.resumeData?.approved) {
      throw new Error("Manager denied request");
    }
    return ctx.resumeData;
  },
});

const waitDirectorApproval = createStep({
  id: "wait-director",
  execute: async (ctx) => {
    const { requestId } = ctx.getStepResult("submit");
    
    // Another suspension for director
    ctx.suspend({ 
      type: "director-approval",
      requestId 
    });
  },
});

const finalizeRequest = createStep({
  id: "finalize",
  execute: async (ctx) => {
    const managerApproval = ctx.getStepResult("check-manager");
    const directorApproval = ctx.resumeData;
    
    if (!directorApproval?.approved) {
      throw new Error("Director denied request");
    }
    
    return {
      status: "approved",
      approvals: [managerApproval, directorApproval],
    };
  },
});

const approvalWorkflow = createWorkflow({ 
  id: "multi-approval" 
})
  .step(submitRequest)
  .step(waitManagerApproval)
  .step(checkManagerApproval)
  .step(waitDirectorApproval)
  .step(finalizeRequest)
  .withSnapshotStore(snapshotStore)
  .commit();

Human-in-the-Loop Agent

import { AgentBuilder } from "@iqai/adk";

const analyzeRequest = createStep({
  id: "analyze",
  execute: async (ctx) => {
    const { runner } = await AgentBuilder
      .create("analyzer")
      .withModel("gpt-4")
      .withInstruction("Analyze the request and identify risks")
      .build();
    
    const events = await runner.run({
      userId: "system",
      newMessage: ctx.inputData.request,
    });
    
    const analysis = events[events.length - 1].text;
    return { analysis, requiresReview: analysis.includes("HIGH RISK") };
  },
});

const conditionalReview = createStep({
  id: "review",
  execute: async (ctx) => {
    const { requiresReview } = ctx.getStepResult("analyze");
    
    if (requiresReview) {
      // Suspend for human review
      ctx.suspend({ reason: "High risk detected" });
    }
    
    return { reviewed: false, autoApproved: true };
  },
});

const executeAction = createStep({
  id: "execute",
  execute: async (ctx) => {
    const reviewResult = ctx.getStepResult("review");
    const humanReview = ctx.resumeData;
    
    if (humanReview?.rejected) {
      throw new Error("Action rejected by human reviewer");
    }
    
    // Proceed with action
    return await performAction(ctx.inputData);
  },
});

const aiReviewWorkflow = createWorkflow({ id: "ai-review" })
  .step(analyzeRequest)
  .step(conditionalReview)
  .step(executeAction)
  .commit();

Long-running Data Pipeline

const extractData = createStep({
  id: "extract",
  execute: async (ctx) => {
    const data = await extractFromSource(ctx.inputData.sourceUrl);
    return { records: data, count: data.length };
  },
});

const transformData = createStep({
  id: "transform",
  execute: async (ctx) => {
    const { records } = ctx.getStepResult("extract");
    const transformed = await transformRecords(records);
    return { transformed, count: transformed.length };
  },
});

const waitForApproval = createStep({
  id: "approve",
  execute: async (ctx) => {
    const { count } = ctx.getStepResult("transform");
    
    // For large datasets, require approval before load
    if (count > 10000) {
      ctx.suspend({ message: `Approve loading ${count} records?` });
    }
    
    return { approved: true };
  },
});

const loadData = createStep({
  id: "load",
  execute: async (ctx) => {
    const { transformed } = ctx.getStepResult("transform");
    const approval = ctx.resumeData || { approved: true };
    
    if (!approval.approved) {
      throw new Error("Load cancelled");
    }
    
    await loadToDestination(transformed);
    return { loaded: transformed.length };
  },
});

const etlWorkflow = createWorkflow({ id: "etl-pipeline" })
  .step(extractData)
  .step(transformData)
  .step(waitForApproval)
  .step(loadData)
  .withSnapshotStore(snapshotStore)
  .commit();

Workflow vs LangGraphAgent

Understand when to use each:
FeatureWorkflowLangGraphAgent
PurposeLong-running processes with suspensionIn-memory agent orchestration
ExecutionCan pause and resume across restartsRuns to completion in memory
StatePersisted snapshotsIn-memory only
Use CaseApproval flows, pipelines, human-in-loopMulti-agent coordination, tool routing
DurationHours to daysSeconds to minutes
ResumptionResume from exact stepRe-run from beginning
Use Workflows for processes that need to wait for external events (approvals, webhooks, scheduled triggers). Use LangGraphAgent for complex agent routing and coordination that completes in a single session.

Best Practices

Keep steps atomic - Each step should be a discrete unit of work that can succeed or fail independently.
Use meaningful step IDs - Step IDs are used to reference results. Make them descriptive.
Implement snapshot persistence - For production workflows, always use a durable snapshot store (database, Redis, etc.).
Handle errors gracefully - Failed steps should provide clear error messages to help with debugging.
Version your workflows - If you modify a workflow, consider using a new workflow ID to avoid conflicts with existing suspended runs.
Suspended workflows remain in the snapshot store until completed or manually deleted. Implement cleanup for abandoned workflows.

See Also

Build docs developers (and LLMs) love