Overview
Stepkit provides powerful patterns for data processing including context transformation, nested pipelines, and error handling. This guide covers practical patterns for managing data flow in your pipelines.
Context Transformation
Use transform to clean the context by removing intermediate or sensitive fields, keeping only what the next steps need.
const cleaner = stepkit <{ token : string }>()
. step ( 'fetch-user' , async ({ token }) => ({
user: await getUser ( token ),
token , // still present for now
debugInfo: { fetchedAt: Date . now () },
}))
. step ( 'fetch-settings' , async ({ user }) => ({
rawSettings: await getSettings ( user . id ),
transient: 'will-be-removed' ,
}))
// Replace the entire context to remove clutter and sensitive data
. transform ( 'clean-context' , ({ user , rawSettings }) => ({
userId: user . id ,
email: user . email ,
theme: rawSettings . theme ?? 'system' ,
isPro: rawSettings . plan === 'pro' ,
}))
. step ( 'use-clean' , ({ userId , theme , isPro }) => ({
profileReady: true ,
message: ` ${ isPro ? 'Pro' : 'Free' } user ${ userId } prefers ${ theme } theme` ,
}))
await cleaner . run ({ token: 'secret' })
Why Transform Context?
Security Remove sensitive data like tokens, passwords, or API keys from the context
Clarity Keep only relevant fields for downstream steps, making the pipeline easier to understand
Performance Reduce memory usage by discarding large intermediate objects
Type Safety Define exactly what data flows to the next steps
Nested Pipelines
Compose smaller pipelines into larger ones for better organization and reusability.
// Session sub-pipeline: load session and permissions
const sessionPipeline = stepkit <{ sessionId : string }>()
. step ( 'fetch-session' , async ({ sessionId }) => ({ session: await getSession ( sessionId ) }))
. step ( 'fetch-permissions' , async ({ session }) => ({
permissions: await getPermissions ( session . userId ),
}))
// Main pipeline composes the session pipeline and continues
const main = stepkit <{ sessionId : string }>()
. step ( 'load-session' , sessionPipeline )
. step ( 'use-permissions' , ({ permissions }) => ({ canPublish: permissions . includes ( 'publish' ) }))
await main . run ({ sessionId: 'abc123' })
Nested pipelines merge outputs using the wrapping step’s mergePolicy (default: override)
Nested step names are prefixed for typing, e.g. load-session/fetch-session appears in StepNames and StepOutput
Error Handling & Retries
Handle failures gracefully and retry transient errors automatically.
const fetchWithRetry = stepkit ()
. step (
{
name: 'fetch-resource' ,
onError: 'continue' ,
retries: 2 ,
retryDelayMs: 250 ,
shouldRetry : ( err ) => /429 | timeout/ i . test ( String ( err ?. message ?? err )),
},
async () => {
// imagine a flaky network call
const ok = Math . random () > 0.5
if ( ! ok ) throw new Error ( '429: too many requests' )
return { data: { id: '42' } }
},
)
. step ( 'continue-anyway' , ({ data }) => ({ hasData: !! data }))
await fetchWithRetry . run ({})
Error Handling Options
The pipeline continues even if this step fails. Failed steps don’t merge outputs into context.
Number of times to retry the step before failing. Defaults to 0 (no retries).
Delay in milliseconds between retry attempts. Useful for rate-limited APIs.
Function that receives the error and returns true if the step should be retried. Use this to only retry specific error types.
Timeouts & Abort Signals
Guard slow steps and support cancelling entire pipelines.
const ac = new AbortController ()
const guarded = stepkit ()
. step (
{ name: 'third-party-api-request' , timeout: 1500 , onError: 'continue' },
async () => {
// Simulate an external API that may be slow
await new Promise (( r ) => setTimeout ( r , 2000 ))
return { thirdPartyOk: true }
},
)
. step ( 'after' , ({ thirdPartyOk }) => ({
status: thirdPartyOk ? 'used-third-party' : 'skipped-third-party' ,
}))
// ac.abort() would cancel; pass the signal at run time
await guarded . run ({}, { signal: ac . signal })
Combine timeout with onError: 'continue' to make slow external dependencies optional rather than blocking your entire pipeline.
Stopping Pipelines Early
Use e.stopPipeline() in the onStepComplete callback to end execution after a specific step.
await stepkit <{ n : number }>()
. step ( 's1' , ({ n }) => ({ n: n + 1 }))
. step ( 's2' , ({ n }) => ({ n: n + 1 }))
. step ( 's3' , ({ n }) => ({ n: n + 1 }))
. run (
{ n: 0 },
{
onStepComplete : ( e ) => {
if ( e . stepName === 's2' ) e . stopPipeline ()
},
},
)
// => { n: 2 }
Common Patterns
Fetch → Transform → Process
Load data, clean it with transform, then process the clean data
Parallel → Merge → Continue
Run multiple data fetches in parallel, merge results, then continue with enriched context
Try → Fallback
Attempt an operation with retries and timeout, fall back to default if it fails
Compose → Reuse
Build reusable sub-pipelines for common operations like authentication or data validation
AI Workflows Apply these patterns to AI-powered workflows
Human Approval Learn about checkpoints and resumable pipelines