Skip to main content

Overview

Stepkit provides checkpoint functionality that allows you to:
  • Save state after any step completes
  • Resume execution from a saved checkpoint
  • Override context when resuming
  • Implement human approval workflows
  • Handle long-running pipelines
Checkpoints capture the entire context at a specific step, enabling powerful workflow patterns.

Basic Checkpoint Usage

Capture a checkpoint using the onStepComplete callback:
import { stepkit } from 'stepkit'

const calc = stepkit<{ a: number; b?: number }>()
  .step('add-one', ({ a }) => ({ a: a + 1 }))
  .step('double', ({ a }) => ({ a: a * 2 }))
  .step('finish', ({ a, b }) => ({ sum: (a ?? 0) + (b ?? 0) }))

let checkpoint = ''

await calc.run(
  { a: 1 },
  {
    onStepComplete: (e) => {
      if (e.stepName === 'double') {
        checkpoint = e.checkpoint // Save checkpoint
      }
    },
  },
)

// Resume later with an override
const resumed = await calc.runCheckpoint({ checkpoint, overrideData: { b: 10 } })
console.log(resumed) // { a: 4, b: 10, sum: 14 }

Step Complete Event

The onStepComplete callback receives an event with checkpoint data:
stepName
string
The name of the step that just completed.
duration
number
The execution time of the step in milliseconds.
context
TContext
The complete context after the step executed (deep cloned).
checkpoint
string
A serialized checkpoint containing the step name and output context. This is a JSON string that can be stored and used to resume later.
stopPipeline
() => void
Function to stop pipeline execution after this step. See Stopping Early.
await pipeline.run(
  { input: 'data' },
  {
    onStepComplete: (event) => {
      console.log(`Step ${event.stepName} completed in ${event.duration}ms`)
      console.log('Current context:', event.context)
      console.log('Checkpoint:', event.checkpoint)
    },
  },
)

Resuming from Checkpoints

Use runCheckpoint() to resume from a saved checkpoint:

Resume with Checkpoint String

const checkpoint = '...' // From previous run
const result = await pipeline.runCheckpoint(checkpoint)

Resume with Override Data

const checkpoint = '...' // From previous run
const result = await pipeline.runCheckpoint({
  checkpoint,
  overrideData: { additionalField: 'value' },
})
The override data is shallowly merged into the checkpoint context before resuming.

Resume with Options

const result = await pipeline.runCheckpoint(
  checkpoint,
  {
    log: { stopwatch: true },
    onStepComplete: (e) => console.log('Step completed:', e.stepName),
  },
)

How Resume Works

  1. Skip completed steps: Steps before the checkpoint are skipped
  2. Skip checkpoint step: The exact checkpoint step is also skipped (it already ran)
  3. Resume execution: Execution starts from the step after the checkpoint
  4. Context restored: The saved context (with any overrides) is used
const pipeline = stepkit<{ n: number }>()
  .step('s1', ({ n }) => ({ n: n + 1 }))
  .step('s2', ({ n }) => ({ n: n + 1 })) // Checkpoint saved here
  .step('s3', ({ n }) => ({ n: n + 1 }))

let checkpoint = ''
await pipeline.run(
  { n: 0 },
  {
    onStepComplete: (e) => {
      if (e.stepName === 's2') checkpoint = e.checkpoint
    },
  },
)
// Result: { n: 3 }

const resumed = await pipeline.runCheckpoint(checkpoint)
// Skips s1 and s2, executes only s3
// Result: { n: 3 } (2 from checkpoint + 1 from s3)

Human Approval Workflow

Use checkpoints to implement human-in-the-loop workflows:
import { stepkit } from 'stepkit'

// Mock key-value store
const kv: Record<string, string> = {}
const save = async (id: string, cp: string) => (kv[id] = cp)
const get = async (id: string) => kv[id] ?? null
const del = async (id: string) => { delete kv[id] }

const sendEmail = async ({ to, body }: { to: string; body: string }) => {
  console.log('Sending email to', to, 'with body:', body)
}

const replyFlow = stepkit<{ body: string }>()
  .step('generate', async ({ body }) => ({ reply: `Reply: ${body}` }))
  .step('send', async ({ reply }) => {
    await sendEmail({ to: '[email protected]', body: reply })
  })

export const start = async (body: string) => {
  let approvalId: string | null = null
  
  await replyFlow.run(
    { body },
    {
      async onStepComplete(e) {
        if (e.stepName.endsWith('generate')) {
          approvalId = `apr_${Date.now()}`
          await save(approvalId, e.checkpoint)
          e.stopPipeline() // Stop here for approval
        }
      },
    },
  )
  
  return { approvalId }
}

export const approve = async (approvalId: string) => {
  const checkpoint = await get(approvalId)
  if (!checkpoint) throw new Error('Not found')
  
  await replyFlow.runCheckpoint(checkpoint) // Resume and complete
  await del(approvalId)
}

export const reject = async (approvalId: string) => {
  await del(approvalId) // Just delete, don't resume
}

// Usage
const { approvalId } = await start('Hello, how are you?')
// ... wait for human approval ...
await approve(approvalId!) // Email is sent

Stopping Early

Call stopPipeline() in onStepComplete to halt execution:
await stepkit<{ n: number }>()
  .step('s1', ({ n }) => ({ n: n + 1 }))
  .step('s2', ({ n }) => ({ n: n + 1 }))
  .step('s3', ({ n }) => ({ n: n + 1 }))
  .run(
    { n: 0 },
    {
      onStepComplete: (e) => {
        if (e.stepName === 's2') {
          e.stopPipeline() // Stop after s2
        }
      },
    },
  )
// Returns: { n: 2 } (s3 never executes)
Calling stopPipeline() is a graceful stop. The pipeline returns the context as of the completed step.

Checkpoint Structure

Checkpoints are JSON strings with this structure:
type Checkpoint<T> = {
  stepName: string
  output: T
}
You can serialize and deserialize checkpoints:
import { serializeCheckpoint, deserializeCheckpoint, type Checkpoint } from 'stepkit'

const checkpoint: Checkpoint<{ a: number }> = {
  stepName: 'my-step',
  output: { a: 42 },
}

const serialized = serializeCheckpoint(checkpoint)
// '{"stepName":"my-step","output":{"a":42}}'

const deserialized = deserializeCheckpoint<{ a: number }>(serialized)
// { stepName: 'my-step', output: { a: 42 } }

Storing Checkpoints

Checkpoints can be stored anywhere:

In-Memory (for testing)

const checkpoints = new Map<string, string>()

const id = 'checkpoint-1'
checkpoints.set(id, checkpoint)

const saved = checkpoints.get(id)
if (saved) {
  await pipeline.runCheckpoint(saved)
}

Database

import { db } from './database'

await pipeline.run(
  input,
  {
    async onStepComplete(e) {
      if (e.stepName === 'critical-step') {
        await db.checkpoints.insert({
          pipelineId: 'workflow-123',
          checkpoint: e.checkpoint,
          createdAt: new Date(),
        })
      }
    },
  },
)

// Resume later
const saved = await db.checkpoints.findOne({ pipelineId: 'workflow-123' })
if (saved) {
  await pipeline.runCheckpoint(saved.checkpoint)
}

Redis

import { redis } from './redis'

const approvalId = `approval-${Date.now()}`

await pipeline.run(
  input,
  {
    async onStepComplete(e) {
      if (needsApproval(e)) {
        await redis.set(
          approvalId,
          e.checkpoint,
          'EX',
          3600 // Expire after 1 hour
        )
        e.stopPipeline()
      }
    },
  },
)

// Resume on approval
const checkpoint = await redis.get(approvalId)
if (checkpoint) {
  await pipeline.runCheckpoint(checkpoint)
  await redis.del(approvalId)
}

Use Cases

Long-Running Workflows

Save progress periodically:
const pipeline = stepkit<{ batchId: string }>()
  .step('load-batch', async ({ batchId }) => ({ items: await loadItems(batchId) }))
  .step('process-items', async ({ items }) => ({ processed: await processAll(items) }))
  .step('validate', async ({ processed }) => ({ validated: await validate(processed) }))
  .step('save-results', async ({ validated }) => ({ saved: await save(validated) }))

await pipeline.run(
  { batchId: '123' },
  {
    async onStepComplete(e) {
      // Save checkpoint after each step
      await db.saveCheckpoint({
        workflowId: 'batch-123',
        stepName: e.stepName,
        checkpoint: e.checkpoint,
      })
    },
  },
)

Multi-Stage Approval

Require multiple approvals:
const pipeline = stepkit<{ request: string }>()
  .step('draft', async ({ request }) => ({ draft: await createDraft(request) }))
  .step('manager-review', async ({ draft }) => ({ reviewed: await review(draft) }))
  .step('legal-review', async ({ reviewed }) => ({ approved: await legal(reviewed) }))
  .step('publish', async ({ approved }) => ({ published: await publish(approved) }))

let managerCheckpoint = ''
let legalCheckpoint = ''

await pipeline.run(
  { request: 'New policy' },
  {
    onStepComplete: async (e) => {
      if (e.stepName === 'draft') {
        managerCheckpoint = e.checkpoint
        e.stopPipeline() // Wait for manager
      } else if (e.stepName === 'manager-review') {
        legalCheckpoint = e.checkpoint
        e.stopPipeline() // Wait for legal
      }
    },
  },
)

// After manager approval
await pipeline.runCheckpoint(managerCheckpoint)

// After legal approval
await pipeline.runCheckpoint(legalCheckpoint)

Logging with Resume

When resuming, logs indicate the starting point:
await pipeline.runCheckpoint(checkpoint, { log: { stopwatch: true } })
Output:
🚀 Resuming pipeline from checkpoint step: double

⏭️  Step: add-one (resume-skip)
⏭️  Step: double (resume-skip)

📍 Step: finish
✅ finish completed in 2ms
   Output: sum

⏱️  Performance Summary:
↪️ Resumed from checkpoint: double
┌──────────────────────────────────────────────────────┐
│ ⏭️  add-one                                   skipped │
│ ⏭️  double                                    skipped │
│ ✅ finish                                         2ms │
└──────────────────────────────────────────────────────┘

Type Safety

Checkpoints maintain type safety:
import { type Checkpoint } from 'stepkit'

type MyContext = { a: number; b: string }

const pipeline = stepkit<MyContext>()
  .step('process', ({ a, b }) => ({ c: a + b.length }))

let checkpoint: string = ''

await pipeline.run(
  { a: 1, b: 'test' },
  {
    onStepComplete: (e) => {
      checkpoint = e.checkpoint
      // e.context is typed as MyContext & { c: number }
    },
  },
)

Build docs developers (and LLMs) love