The ADK workflow system enables building long-running, suspendable workflows that can pause execution, wait for external events, and resume from where they left off. This is different from LangGraphAgent, which is for in-memory agent orchestration.
Overview
Workflows provide:
- Step-by-step execution - Break complex processes into discrete steps
- Suspend/resume - Pause workflows to wait for external input or events
- Snapshot persistence - Save and restore workflow state across restarts
- Type safety - Fully typed workflow inputs, outputs, and step results
- Error handling - Graceful failure handling with detailed error tracking
Quick Start
import { createWorkflow, createStep } from "@iqai/adk";
// Define workflow steps
const fetchData = createStep({
id: "fetch-data",
description: "Fetch user data from API",
execute: async (ctx) => {
const userId = ctx.inputData.userId;
const response = await fetch(`/api/users/${userId}`);
return response.json();
},
});
const processData = createStep({
id: "process-data",
description: "Process user data",
execute: async (ctx) => {
const userData = ctx.getStepResult("fetch-data");
return {
name: userData.name,
email: userData.email,
processed: true,
};
},
});
// Create workflow
const workflow = createWorkflow({
id: "user-processing",
description: "Fetch and process user data",
})
.step(fetchData)
.step(processData)
.commit();
// Execute workflow
const run = workflow.createRun();
const result = await run.start({ userId: "123" });
if (result.status === "success") {
console.log("Result:", result.result);
}
Core Concepts
Workflow
A workflow is a sequence of steps that execute in order:
const workflow = createWorkflow({
id: "my-workflow",
description: "Description of what this workflow does",
inputSchema: z.object({ userId: z.string() }), // Optional: Zod schema
outputSchema: z.object({ result: z.string() }), // Optional: Zod schema
});
Step
A step is a single unit of work in a workflow:
const myStep = createStep({
id: "my-step",
description: "What this step does",
execute: async (ctx) => {
// Access workflow input
const input = ctx.inputData;
// Access previous step results
const prevResult = ctx.getStepResult("previous-step-id");
// Do work
const result = await doSomething(input, prevResult);
return result;
},
});
Run
A run is a single execution instance of a workflow:
const run = workflow.createRun();
const result = await run.start({ userId: "123" });
Suspending and Resuming
The key feature of workflows is the ability to suspend execution:
const waitForApproval = createStep({
id: "wait-approval",
execute: async (ctx) => {
// Suspend workflow and wait for external approval
ctx.suspend({
approvalRequestId: "req-123",
message: "Waiting for manager approval"
});
// This code never runs - suspend throws
},
});
const processApproval = createStep({
id: "process-approval",
execute: async (ctx) => {
// Access resume data
const approval = ctx.resumeData;
if (approval.approved) {
return { status: "approved", by: approval.approvedBy };
} else {
throw new Error("Request denied");
}
},
});
const workflow = createWorkflow({ id: "approval-workflow" })
.step(waitForApproval)
.step(processApproval)
.commit();
// Start workflow
const run = workflow.createRun();
const result = await run.start({ requestId: "123" });
if (result.status === "suspended") {
console.log("Workflow suspended at:", result.suspendedStep);
console.log("Suspend data:", result.suspendPayload);
// Later, when approval comes in...
const finalResult = await run.resume({
resumeData: { approved: true, approvedBy: "[email protected]" },
});
console.log("Final result:", finalResult);
}
Snapshot Persistence
Persist workflow state to resume across restarts:
import {
createWorkflow,
InMemorySnapshotStore,
type SnapshotStore
} from "@iqai/adk";
// Use in-memory store (for development)
const snapshotStore = new InMemorySnapshotStore();
// Or implement custom store (for production)
class DatabaseSnapshotStore implements SnapshotStore {
async save(snapshot: WorkflowSnapshot): Promise<void> {
await db.workflows.upsert({
where: { runId: snapshot.runId },
update: snapshot,
create: snapshot,
});
}
async load(
workflowId: string,
runId: string
): Promise<WorkflowSnapshot | null> {
return db.workflows.findUnique({
where: { workflowId, runId }
});
}
async delete(workflowId: string, runId: string): Promise<void> {
await db.workflows.delete({ where: { workflowId, runId } });
}
async list(workflowId: string): Promise<WorkflowSnapshot[]> {
return db.workflows.findMany({ where: { workflowId } });
}
}
const workflow = createWorkflow({ id: "my-workflow" })
.step(step1)
.step(step2)
.withSnapshotStore(snapshotStore)
.commit();
Resuming from Snapshots
// Start a workflow
const run = workflow.createRun("specific-run-id");
const result = await run.start(input);
if (result.status === "suspended") {
// Workflow suspended - snapshot is automatically saved
}
// Later (even after server restart)...
const resumedRun = await workflow.resumeRun("specific-run-id");
if (resumedRun) {
const finalResult = await resumedRun.resume({
resumeData: externalData,
});
}
// List all suspended runs
const suspendedRuns = await workflow.listRuns();
for (const snapshot of suspendedRuns) {
console.log("Suspended run:", snapshot.runId, snapshot.suspendedStepId);
}
Execute Context
The ExecuteContext is passed to every step’s execute function:
interface ExecuteContext<TInput, TResume, TSuspend> {
/** Unique ID for this workflow run */
runId: string;
/** ID of the workflow */
workflowId: string;
/** Input data passed to workflow.start() */
inputData: TInput;
/** Resume data (only set on resumed steps) */
resumeData?: TResume;
/** Suspend the workflow with optional payload */
suspend: (payload?: TSuspend) => never;
/** Get the result of a previous step */
getStepResult: <T>(stepId: string) => T | undefined;
}
Step Results
Each step produces a result that is tracked:
type StepResult<TInput, TOutput, TResume, TSuspend> =
| StepSuccess<TInput, TOutput>
| StepSuspended<TInput, TSuspend>
| StepFailed<TInput>;
interface StepSuccess<TInput, TOutput> {
status: "success";
output: TOutput;
payload: TInput;
startedAt: number;
endedAt: number;
}
interface StepSuspended<TInput, TSuspend> {
status: "suspended";
payload: TInput;
suspendPayload?: TSuspend;
startedAt: number;
suspendedAt: number;
}
interface StepFailed<TInput> {
status: "failed";
error: Error;
payload: TInput;
startedAt: number;
endedAt: number;
}
Access step results in the final workflow result:
const result = await run.start(input);
if (result.status === "success") {
console.log("Step results:", result.steps);
// result.steps["step-id"] contains the step result
}
Advanced Examples
Multi-step Approval Workflow
const submitRequest = createStep({
id: "submit",
execute: async (ctx) => {
const request = await api.submitRequest(ctx.inputData);
return { requestId: request.id, status: "pending" };
},
});
const waitManagerApproval = createStep({
id: "wait-manager",
execute: async (ctx) => {
const { requestId } = ctx.getStepResult("submit");
// Suspend and wait for manager
ctx.suspend({
type: "manager-approval",
requestId
});
},
});
const checkManagerApproval = createStep({
id: "check-manager",
execute: async (ctx) => {
if (!ctx.resumeData?.approved) {
throw new Error("Manager denied request");
}
return ctx.resumeData;
},
});
const waitDirectorApproval = createStep({
id: "wait-director",
execute: async (ctx) => {
const { requestId } = ctx.getStepResult("submit");
// Another suspension for director
ctx.suspend({
type: "director-approval",
requestId
});
},
});
const finalizeRequest = createStep({
id: "finalize",
execute: async (ctx) => {
const managerApproval = ctx.getStepResult("check-manager");
const directorApproval = ctx.resumeData;
if (!directorApproval?.approved) {
throw new Error("Director denied request");
}
return {
status: "approved",
approvals: [managerApproval, directorApproval],
};
},
});
const approvalWorkflow = createWorkflow({
id: "multi-approval"
})
.step(submitRequest)
.step(waitManagerApproval)
.step(checkManagerApproval)
.step(waitDirectorApproval)
.step(finalizeRequest)
.withSnapshotStore(snapshotStore)
.commit();
Human-in-the-Loop Agent
import { AgentBuilder } from "@iqai/adk";
const analyzeRequest = createStep({
id: "analyze",
execute: async (ctx) => {
const { runner } = await AgentBuilder
.create("analyzer")
.withModel("gpt-4")
.withInstruction("Analyze the request and identify risks")
.build();
const events = await runner.run({
userId: "system",
newMessage: ctx.inputData.request,
});
const analysis = events[events.length - 1].text;
return { analysis, requiresReview: analysis.includes("HIGH RISK") };
},
});
const conditionalReview = createStep({
id: "review",
execute: async (ctx) => {
const { requiresReview } = ctx.getStepResult("analyze");
if (requiresReview) {
// Suspend for human review
ctx.suspend({ reason: "High risk detected" });
}
return { reviewed: false, autoApproved: true };
},
});
const executeAction = createStep({
id: "execute",
execute: async (ctx) => {
const reviewResult = ctx.getStepResult("review");
const humanReview = ctx.resumeData;
if (humanReview?.rejected) {
throw new Error("Action rejected by human reviewer");
}
// Proceed with action
return await performAction(ctx.inputData);
},
});
const aiReviewWorkflow = createWorkflow({ id: "ai-review" })
.step(analyzeRequest)
.step(conditionalReview)
.step(executeAction)
.commit();
Long-running Data Pipeline
const extractData = createStep({
id: "extract",
execute: async (ctx) => {
const data = await extractFromSource(ctx.inputData.sourceUrl);
return { records: data, count: data.length };
},
});
const transformData = createStep({
id: "transform",
execute: async (ctx) => {
const { records } = ctx.getStepResult("extract");
const transformed = await transformRecords(records);
return { transformed, count: transformed.length };
},
});
const waitForApproval = createStep({
id: "approve",
execute: async (ctx) => {
const { count } = ctx.getStepResult("transform");
// For large datasets, require approval before load
if (count > 10000) {
ctx.suspend({ message: `Approve loading ${count} records?` });
}
return { approved: true };
},
});
const loadData = createStep({
id: "load",
execute: async (ctx) => {
const { transformed } = ctx.getStepResult("transform");
const approval = ctx.resumeData || { approved: true };
if (!approval.approved) {
throw new Error("Load cancelled");
}
await loadToDestination(transformed);
return { loaded: transformed.length };
},
});
const etlWorkflow = createWorkflow({ id: "etl-pipeline" })
.step(extractData)
.step(transformData)
.step(waitForApproval)
.step(loadData)
.withSnapshotStore(snapshotStore)
.commit();
Workflow vs LangGraphAgent
Understand when to use each:
| Feature | Workflow | LangGraphAgent |
|---|
| Purpose | Long-running processes with suspension | In-memory agent orchestration |
| Execution | Can pause and resume across restarts | Runs to completion in memory |
| State | Persisted snapshots | In-memory only |
| Use Case | Approval flows, pipelines, human-in-loop | Multi-agent coordination, tool routing |
| Duration | Hours to days | Seconds to minutes |
| Resumption | Resume from exact step | Re-run from beginning |
Use Workflows for processes that need to wait for external events (approvals, webhooks, scheduled triggers). Use LangGraphAgent for complex agent routing and coordination that completes in a single session.
Best Practices
Keep steps atomic - Each step should be a discrete unit of work that can succeed or fail independently.
Use meaningful step IDs - Step IDs are used to reference results. Make them descriptive.
Implement snapshot persistence - For production workflows, always use a durable snapshot store (database, Redis, etc.).
Handle errors gracefully - Failed steps should provide clear error messages to help with debugging.
Version your workflows - If you modify a workflow, consider using a new workflow ID to avoid conflicts with existing suspended runs.
Suspended workflows remain in the snapshot store until completed or manually deleted. Implement cleanup for abandoned workflows.
See Also