What are Steps?
Steps are the building blocks of a pipeline. Each step is a function that receives the current context and returns new data to merge into the context.
Step Functions
A step function has this signature:
type StepFunction<TContext, TOutput> = (
context: TContext
) => TOutput | Promise<TOutput> | void | Promise<void>
Basic Step
const pipeline = stepkit<{ userId: string }>()
.step('fetch-user', ({ userId }) => {
// Return new data to add to context
return { userName: 'John Doe', email: '[email protected]' }
})
.step('fetch-settings', ({ userName }) => {
// Previous step's output is available
return { theme: 'dark' }
})
Void Returns
Steps can return void if they only perform side effects:
const pipeline = stepkit<{ userId: string }>()
.step('log-user', ({ userId }) => {
console.log('Processing user:', userId)
// No return value - treated as {}
})
.step('fetch-data', ({ userId }) => {
return { data: 'some data' }
})
Naming Steps
Steps can be named or anonymous:
// Named step
.step('fetch-user', ({ userId }) => ({ userName: 'John' }))
// Anonymous step (auto-generated name like "step-1")
.step(({ userId }) => ({ userName: 'John' }))
Use named steps for better debugging and logging. Anonymous steps make it harder to track pipeline execution.
Step Configuration
Steps can be configured with additional options:
const pipeline = stepkit<{ shouldRun: boolean }>()
.step(
{
name: 'risky-operation',
condition: ({ shouldRun }) => shouldRun,
timeout: 5000,
onError: 'continue',
retries: 3,
retryDelayMs: 1000,
log: true,
mergePolicy: 'override'
},
({ data }) => {
return { result: 'processed' }
}
)
Configuration Options
Parallel Steps
You can run multiple functions in parallel within a single step:
const pipeline = stepkit<{ userId: string }>()
.step(
'fetch-all-data',
({ userId }) => ({ userName: 'John' }),
({ userId }) => ({ email: '[email protected]' }),
({ userId }) => ({ avatar: 'url' })
)
All three functions run in parallel, and their outputs are merged:
// Result: { userId: '123', userName: 'John', email: '[email protected]', avatar: 'url' }
Parallel Mode
// All must succeed (default)
.step(
{ name: 'critical-data', parallelMode: 'all' },
fetchUser,
fetchSettings,
fetchPermissions
)
// Continue even if some fail
.step(
{ name: 'optional-data', parallelMode: 'settled' },
fetchOptionalData1,
fetchOptionalData2,
fetchOptionalData3
)
Conditional Steps
Steps can be executed conditionally:
const pipeline = stepkit<{ isPremium: boolean }>()
.step('init', () => ({ base: 'data' }))
.step(
{
name: 'premium-step',
condition: ({ isPremium }) => isPremium
},
() => ({ premiumFeatures: ['a', 'b'] })
)
.step('final', ({ premiumFeatures }) => {
// premiumFeatures is optional since the step might be skipped
return { hasPremium: !!premiumFeatures }
})
Async Conditions
Conditions can be async:
.step(
{
name: 'admin-only',
condition: async ({ userId }) => {
const user = await fetchUser(userId)
return user.role === 'admin'
}
},
() => ({ adminPanel: true })
)
Unlike regular steps that merge output into context, transform steps replace the entire context:
const pipeline = stepkit<{ rawData: string }>()
.step('parse', ({ rawData }) => {
const parsed = JSON.parse(rawData)
return { parsed }
})
.transform('reshape', ({ parsed }) => ({
// Completely replace context
id: parsed.id,
name: parsed.name
}))
// rawData and parsed are gone, only id and name exist
.step('process', ({ id, name }) => {
return { processed: true }
})
Transform must return a value. Returning void, null, or undefined will throw an error.
- Data reshaping: When you need to change the structure of your context
- Cleanup: When you want to remove unnecessary data
- Type narrowing: When you want to enforce a specific shape
// Before transform: { userId: string, tempData: any, cache: Map, ... }
.transform('cleanup', ({ userId }) => ({ userId }))
// After transform: { userId: string }
Branch Steps
Branching allows conditional execution of sub-pipelines:
const pipeline = stepkit<{ plan: string }>()
.step('init', () => ({ initialized: true }))
.branchOn(
{
when: ({ plan }) => plan === 'premium',
then: (builder) =>
builder.step('fetch-premium', () => ({ features: ['a', 'b'] }))
},
{
when: ({ plan }) => plan === 'basic',
then: (builder) =>
builder.step('fetch-basic', () => ({ features: ['c'] }))
},
{
default: (builder) =>
builder.step('fetch-free', () => ({ features: [] }))
}
)
Prebuilt Branch Pipelines
You can pass prebuilt pipelines to branches:
const premiumFlow = stepkit<{ userId: string; plan: string }>()
.step('fetch-premium-features', () => ({ features: ['a', 'b'] }))
.step('fetch-premium-limits', () => ({ limits: { api: 10000 } }))
const pipeline = stepkit<{ userId: string; plan: string }>()
.branchOn(
{
when: ({ plan }) => plan === 'premium',
then: premiumFlow // Use prebuilt pipeline
},
{
default: (builder) => builder.step('basic', () => ({ features: [] }))
}
)
Nested Pipelines
You can use entire pipelines as steps:
const fetchUserPipeline = stepkit<{ userId: string }>()
.step('fetch-profile', ({ userId }) => ({ name: 'John' }))
.step('fetch-avatar', ({ userId }) => ({ avatar: 'url' }))
const mainPipeline = stepkit<{ userId: string }>()
.step('validate', ({ userId }) => ({ valid: true }))
.step('fetch-user', fetchUserPipeline) // Nested pipeline
.step('process', ({ name, avatar }) => ({ done: true }))
Step Icons
When logging is enabled, different step types show different icons:
- 📍 Step: Regular step
- 🔄 Transform: Context transformation
- 🔀 Branch: Conditional branching
Best Practices
Keep Steps Focused
Each step should do one thing well. Break complex operations into multiple steps.
Use Named Steps
Always name your steps for better debugging and logging.
Handle Errors Gracefully
Use onError configuration for non-critical steps to prevent pipeline failures.
Leverage Type Safety
Let TypeScript infer types from your step outputs for maximum safety.