Overview
Stepkit provides checkpoint functionality that allows you to:
- Save state after any step completes
- Resume execution from a saved checkpoint
- Override context when resuming
- Implement human approval workflows
- Handle long-running pipelines
Checkpoints capture the entire context at a specific step, enabling powerful workflow patterns.
Basic Checkpoint Usage
Capture a checkpoint using the onStepComplete callback:
import { stepkit } from 'stepkit'
const calc = stepkit<{ a: number; b?: number }>()
.step('add-one', ({ a }) => ({ a: a + 1 }))
.step('double', ({ a }) => ({ a: a * 2 }))
.step('finish', ({ a, b }) => ({ sum: (a ?? 0) + (b ?? 0) }))
let checkpoint = ''
await calc.run(
{ a: 1 },
{
onStepComplete: (e) => {
if (e.stepName === 'double') {
checkpoint = e.checkpoint // Save checkpoint
}
},
},
)
// Resume later with an override
const resumed = await calc.runCheckpoint({ checkpoint, overrideData: { b: 10 } })
console.log(resumed) // { a: 4, b: 10, sum: 14 }
Step Complete Event
The onStepComplete callback receives an event with checkpoint data:
The name of the step that just completed.
The execution time of the step in milliseconds.
The complete context after the step executed (deep cloned).
A serialized checkpoint containing the step name and output context. This is a JSON string that can be stored and used to resume later.
Function to stop pipeline execution after this step. See Stopping Early.
await pipeline.run(
{ input: 'data' },
{
onStepComplete: (event) => {
console.log(`Step ${event.stepName} completed in ${event.duration}ms`)
console.log('Current context:', event.context)
console.log('Checkpoint:', event.checkpoint)
},
},
)
Resuming from Checkpoints
Use runCheckpoint() to resume from a saved checkpoint:
Resume with Checkpoint String
const checkpoint = '...' // From previous run
const result = await pipeline.runCheckpoint(checkpoint)
Resume with Override Data
const checkpoint = '...' // From previous run
const result = await pipeline.runCheckpoint({
checkpoint,
overrideData: { additionalField: 'value' },
})
The override data is shallowly merged into the checkpoint context before resuming.
Resume with Options
const result = await pipeline.runCheckpoint(
checkpoint,
{
log: { stopwatch: true },
onStepComplete: (e) => console.log('Step completed:', e.stepName),
},
)
How Resume Works
- Skip completed steps: Steps before the checkpoint are skipped
- Skip checkpoint step: The exact checkpoint step is also skipped (it already ran)
- Resume execution: Execution starts from the step after the checkpoint
- Context restored: The saved context (with any overrides) is used
const pipeline = stepkit<{ n: number }>()
.step('s1', ({ n }) => ({ n: n + 1 }))
.step('s2', ({ n }) => ({ n: n + 1 })) // Checkpoint saved here
.step('s3', ({ n }) => ({ n: n + 1 }))
let checkpoint = ''
await pipeline.run(
{ n: 0 },
{
onStepComplete: (e) => {
if (e.stepName === 's2') checkpoint = e.checkpoint
},
},
)
// Result: { n: 3 }
const resumed = await pipeline.runCheckpoint(checkpoint)
// Skips s1 and s2, executes only s3
// Result: { n: 3 } (2 from checkpoint + 1 from s3)
Human Approval Workflow
Use checkpoints to implement human-in-the-loop workflows:
import { stepkit } from 'stepkit'
// Mock key-value store
const kv: Record<string, string> = {}
const save = async (id: string, cp: string) => (kv[id] = cp)
const get = async (id: string) => kv[id] ?? null
const del = async (id: string) => { delete kv[id] }
const sendEmail = async ({ to, body }: { to: string; body: string }) => {
console.log('Sending email to', to, 'with body:', body)
}
const replyFlow = stepkit<{ body: string }>()
.step('generate', async ({ body }) => ({ reply: `Reply: ${body}` }))
.step('send', async ({ reply }) => {
await sendEmail({ to: '[email protected]', body: reply })
})
export const start = async (body: string) => {
let approvalId: string | null = null
await replyFlow.run(
{ body },
{
async onStepComplete(e) {
if (e.stepName.endsWith('generate')) {
approvalId = `apr_${Date.now()}`
await save(approvalId, e.checkpoint)
e.stopPipeline() // Stop here for approval
}
},
},
)
return { approvalId }
}
export const approve = async (approvalId: string) => {
const checkpoint = await get(approvalId)
if (!checkpoint) throw new Error('Not found')
await replyFlow.runCheckpoint(checkpoint) // Resume and complete
await del(approvalId)
}
export const reject = async (approvalId: string) => {
await del(approvalId) // Just delete, don't resume
}
// Usage
const { approvalId } = await start('Hello, how are you?')
// ... wait for human approval ...
await approve(approvalId!) // Email is sent
Stopping Early
Call stopPipeline() in onStepComplete to halt execution:
await stepkit<{ n: number }>()
.step('s1', ({ n }) => ({ n: n + 1 }))
.step('s2', ({ n }) => ({ n: n + 1 }))
.step('s3', ({ n }) => ({ n: n + 1 }))
.run(
{ n: 0 },
{
onStepComplete: (e) => {
if (e.stepName === 's2') {
e.stopPipeline() // Stop after s2
}
},
},
)
// Returns: { n: 2 } (s3 never executes)
Calling stopPipeline() is a graceful stop. The pipeline returns the context as of the completed step.
Checkpoint Structure
Checkpoints are JSON strings with this structure:
type Checkpoint<T> = {
stepName: string
output: T
}
You can serialize and deserialize checkpoints:
import { serializeCheckpoint, deserializeCheckpoint, type Checkpoint } from 'stepkit'
const checkpoint: Checkpoint<{ a: number }> = {
stepName: 'my-step',
output: { a: 42 },
}
const serialized = serializeCheckpoint(checkpoint)
// '{"stepName":"my-step","output":{"a":42}}'
const deserialized = deserializeCheckpoint<{ a: number }>(serialized)
// { stepName: 'my-step', output: { a: 42 } }
Storing Checkpoints
Checkpoints can be stored anywhere:
In-Memory (for testing)
const checkpoints = new Map<string, string>()
const id = 'checkpoint-1'
checkpoints.set(id, checkpoint)
const saved = checkpoints.get(id)
if (saved) {
await pipeline.runCheckpoint(saved)
}
Database
import { db } from './database'
await pipeline.run(
input,
{
async onStepComplete(e) {
if (e.stepName === 'critical-step') {
await db.checkpoints.insert({
pipelineId: 'workflow-123',
checkpoint: e.checkpoint,
createdAt: new Date(),
})
}
},
},
)
// Resume later
const saved = await db.checkpoints.findOne({ pipelineId: 'workflow-123' })
if (saved) {
await pipeline.runCheckpoint(saved.checkpoint)
}
Redis
import { redis } from './redis'
const approvalId = `approval-${Date.now()}`
await pipeline.run(
input,
{
async onStepComplete(e) {
if (needsApproval(e)) {
await redis.set(
approvalId,
e.checkpoint,
'EX',
3600 // Expire after 1 hour
)
e.stopPipeline()
}
},
},
)
// Resume on approval
const checkpoint = await redis.get(approvalId)
if (checkpoint) {
await pipeline.runCheckpoint(checkpoint)
await redis.del(approvalId)
}
Use Cases
Long-Running Workflows
Save progress periodically:
const pipeline = stepkit<{ batchId: string }>()
.step('load-batch', async ({ batchId }) => ({ items: await loadItems(batchId) }))
.step('process-items', async ({ items }) => ({ processed: await processAll(items) }))
.step('validate', async ({ processed }) => ({ validated: await validate(processed) }))
.step('save-results', async ({ validated }) => ({ saved: await save(validated) }))
await pipeline.run(
{ batchId: '123' },
{
async onStepComplete(e) {
// Save checkpoint after each step
await db.saveCheckpoint({
workflowId: 'batch-123',
stepName: e.stepName,
checkpoint: e.checkpoint,
})
},
},
)
Multi-Stage Approval
Require multiple approvals:
const pipeline = stepkit<{ request: string }>()
.step('draft', async ({ request }) => ({ draft: await createDraft(request) }))
.step('manager-review', async ({ draft }) => ({ reviewed: await review(draft) }))
.step('legal-review', async ({ reviewed }) => ({ approved: await legal(reviewed) }))
.step('publish', async ({ approved }) => ({ published: await publish(approved) }))
let managerCheckpoint = ''
let legalCheckpoint = ''
await pipeline.run(
{ request: 'New policy' },
{
onStepComplete: async (e) => {
if (e.stepName === 'draft') {
managerCheckpoint = e.checkpoint
e.stopPipeline() // Wait for manager
} else if (e.stepName === 'manager-review') {
legalCheckpoint = e.checkpoint
e.stopPipeline() // Wait for legal
}
},
},
)
// After manager approval
await pipeline.runCheckpoint(managerCheckpoint)
// After legal approval
await pipeline.runCheckpoint(legalCheckpoint)
Logging with Resume
When resuming, logs indicate the starting point:
await pipeline.runCheckpoint(checkpoint, { log: { stopwatch: true } })
Output:
🚀 Resuming pipeline from checkpoint step: double
⏭️ Step: add-one (resume-skip)
⏭️ Step: double (resume-skip)
📍 Step: finish
✅ finish completed in 2ms
Output: sum
⏱️ Performance Summary:
↪️ Resumed from checkpoint: double
┌──────────────────────────────────────────────────────┐
│ ⏭️ add-one skipped │
│ ⏭️ double skipped │
│ ✅ finish 2ms │
└──────────────────────────────────────────────────────┘
Type Safety
Checkpoints maintain type safety:
import { type Checkpoint } from 'stepkit'
type MyContext = { a: number; b: string }
const pipeline = stepkit<MyContext>()
.step('process', ({ a, b }) => ({ c: a + b.length }))
let checkpoint: string = ''
await pipeline.run(
{ a: 1, b: 'test' },
{
onStepComplete: (e) => {
checkpoint = e.checkpoint
// e.context is typed as MyContext & { c: number }
},
},
)