Skip to main content

Overview

Stepkit provides powerful patterns for data processing including context transformation, nested pipelines, and error handling. This guide covers practical patterns for managing data flow in your pipelines.

Context Transformation

Use transform to clean the context by removing intermediate or sensitive fields, keeping only what the next steps need.
const cleaner = stepkit<{ token: string }>()
  .step('fetch-user', async ({ token }) => ({
    user: await getUser(token),
    token, // still present for now
    debugInfo: { fetchedAt: Date.now() },
  }))
  .step('fetch-settings', async ({ user }) => ({
    rawSettings: await getSettings(user.id),
    transient: 'will-be-removed',
  }))
  // Replace the entire context to remove clutter and sensitive data
  .transform('clean-context', ({ user, rawSettings }) => ({
    userId: user.id,
    email: user.email,
    theme: rawSettings.theme ?? 'system',
    isPro: rawSettings.plan === 'pro',
  }))
  .step('use-clean', ({ userId, theme, isPro }) => ({
    profileReady: true,
    message: `${isPro ? 'Pro' : 'Free'} user ${userId} prefers ${theme} theme`,
  }))

await cleaner.run({ token: 'secret' })

Why Transform Context?

Security

Remove sensitive data like tokens, passwords, or API keys from the context

Clarity

Keep only relevant fields for downstream steps, making the pipeline easier to understand

Performance

Reduce memory usage by discarding large intermediate objects

Type Safety

Define exactly what data flows to the next steps

Nested Pipelines

Compose smaller pipelines into larger ones for better organization and reusability.
// Session sub-pipeline: load session and permissions
const sessionPipeline = stepkit<{ sessionId: string }>()
  .step('fetch-session', async ({ sessionId }) => ({ session: await getSession(sessionId) }))
  .step('fetch-permissions', async ({ session }) => ({
    permissions: await getPermissions(session.userId),
  }))

// Main pipeline composes the session pipeline and continues
const main = stepkit<{ sessionId: string }>()
  .step('load-session', sessionPipeline)
  .step('use-permissions', ({ permissions }) => ({ canPublish: permissions.includes('publish') }))

await main.run({ sessionId: 'abc123' })
  • Nested pipelines merge outputs using the wrapping step’s mergePolicy (default: override)
  • Nested step names are prefixed for typing, e.g. load-session/fetch-session appears in StepNames and StepOutput

Error Handling & Retries

Handle failures gracefully and retry transient errors automatically.
const fetchWithRetry = stepkit()
  .step(
    {
      name: 'fetch-resource',
      onError: 'continue',
      retries: 2,
      retryDelayMs: 250,
      shouldRetry: (err) => /429|timeout/i.test(String(err?.message ?? err)),
    },
    async () => {
      // imagine a flaky network call
      const ok = Math.random() > 0.5
      if (!ok) throw new Error('429: too many requests')
      return { data: { id: '42' } }
    },
  )
  .step('continue-anyway', ({ data }) => ({ hasData: !!data }))

await fetchWithRetry.run({})

Error Handling Options

The pipeline continues even if this step fails. Failed steps don’t merge outputs into context.
Number of times to retry the step before failing. Defaults to 0 (no retries).
Delay in milliseconds between retry attempts. Useful for rate-limited APIs.
Function that receives the error and returns true if the step should be retried. Use this to only retry specific error types.

Timeouts & Abort Signals

Guard slow steps and support cancelling entire pipelines.
const ac = new AbortController()

const guarded = stepkit()
  .step(
    { name: 'third-party-api-request', timeout: 1500, onError: 'continue' },
    async () => {
      // Simulate an external API that may be slow
      await new Promise((r) => setTimeout(r, 2000))
      return { thirdPartyOk: true }
    },
  )
  .step('after', ({ thirdPartyOk }) => ({
    status: thirdPartyOk ? 'used-third-party' : 'skipped-third-party',
  }))

// ac.abort() would cancel; pass the signal at run time
await guarded.run({}, { signal: ac.signal })
Combine timeout with onError: 'continue' to make slow external dependencies optional rather than blocking your entire pipeline.

Stopping Pipelines Early

Use e.stopPipeline() in the onStepComplete callback to end execution after a specific step.
await stepkit<{ n: number }>()
  .step('s1', ({ n }) => ({ n: n + 1 }))
  .step('s2', ({ n }) => ({ n: n + 1 }))
  .step('s3', ({ n }) => ({ n: n + 1 }))
  .run(
    { n: 0 },
    {
      onStepComplete: (e) => {
        if (e.stepName === 's2') e.stopPipeline()
      },
    },
  )
// => { n: 2 }

Common Patterns

1

Fetch → Transform → Process

Load data, clean it with transform, then process the clean data
2

Parallel → Merge → Continue

Run multiple data fetches in parallel, merge results, then continue with enriched context
3

Try → Fallback

Attempt an operation with retries and timeout, fall back to default if it fails
4

Compose → Reuse

Build reusable sub-pipelines for common operations like authentication or data validation

AI Workflows

Apply these patterns to AI-powered workflows

Human Approval

Learn about checkpoints and resumable pipelines

Build docs developers (and LLMs) love