What are Pipelines?
A pipeline in stepkit is a sequence of steps that process and transform data. Each pipeline is built using the stepkit() builder function and executed with the .run() method.
Creating a Pipeline
Pipelines are created by calling stepkit() with an optional initial input type:
import { stepkit } from 'stepkit'
// Pipeline with typed input
const pipeline = stepkit <{ userId : string }>()
. step ( 'fetch-user' , ({ userId }) => {
return { userName: 'John Doe' }
})
. step ( 'fetch-settings' , ({ userName }) => {
return { theme: 'dark' , language: 'en' }
})
// Execute the pipeline
const result = await pipeline . run ({ userId: '123' })
// result: { userId: '123', userName: 'John Doe', theme: 'dark', language: 'en' }
Pipeline Configuration
You can configure pipeline behavior globally using PipelineConfig:
import type { PipelineConfig } from 'stepkit'
const config : PipelineConfig = {
log: true , // Enable logging
onStepComplete : ( event ) => {
console . log ( `Step ${ event . stepName } completed in ${ event . duration } ms` )
},
onError : ( stepName , error ) => {
console . error ( `Step ${ stepName } failed:` , error )
},
signal: abortController . signal // Support for cancellation
}
const pipeline = stepkit ( config )
Configuration Options
Enable logging and performance tracking. Can be a boolean or a detailed LogConfig object.
onStepComplete
(event: StepCompleteEvent) => void | Promise<void>
Callback fired after each step completes successfully. Receives event with step name, duration, context, and checkpoint data.
onError
(stepName: string, error: Error) => void
Global error handler for pipeline failures.
AbortSignal for cancelling pipeline execution.
Stepkit provides built-in logging and performance tracking:
const pipeline = stepkit <{ value : number }>({
log: {
logFn: console . log ,
errorLogFn: console . error ,
stopwatch: {
showStepDuration: true ,
showSummary: true ,
showTotal: true
}
}
})
. step ( 'double' , ({ value }) => ({ doubled: value * 2 }))
. step ( 'add-ten' , ({ doubled }) => ({ result: doubled + 10 }))
await pipeline . run ({ value: 5 })
This outputs:
🚀 Starting pipeline with input: { value: 5 }
📍 Step: double
✅ double completed in 2ms
Output: doubled
📍 Step: add-ten
✅ add-ten completed in 1ms
Output: result
⏱️ Performance Summary:
┌──────────────────────────────────────────────────────────┐
│ ✅ double 2ms │
│ ✅ add-ten 1ms │
└──────────────────────────────────────────────────────────┘
📊 Statistics:
Average: 1.5ms
Slowest: double (2ms)
Fastest: add-ten (1ms)
⏰ Total Pipeline Time: 3ms
✨ Pipeline completed successfully
Running Pipelines
Basic Execution
const result = await pipeline . run ({ userId: '123' })
Runtime Options
You can override configuration at runtime:
const result = await pipeline . run (
{ userId: '123' },
{
log: false , // Disable logging for this run
onStepComplete : ( event ) => {
// Save checkpoint to database
saveCheckpoint ( event . checkpoint )
}
}
)
Checkpoints and Resumption
Stepkit supports checkpointing to resume pipelines from any step:
let checkpoint : string
const pipeline = stepkit <{ a : number }>()
. step ( 'add-one' , ({ a }) => ({ a: a + 1 }))
. step ( 'double' , ({ a }) => ({ a: a * 2 }))
. step ( 'finish' , ({ a }) => ({ sum: a + 10 }))
// Run and capture checkpoint
await pipeline . run (
{ a: 1 },
{
onStepComplete : ( event ) => {
if ( event . stepName === 'double' ) {
checkpoint = event . checkpoint
}
}
}
)
// Resume from checkpoint
const resumed = await pipeline . runCheckpoint ( checkpoint )
Checkpoint Structure
Checkpoints are serializable JSON strings containing:
type Checkpoint < T > = {
stepName : string // Name of the step that completed
output : T // Context at that point in the pipeline
}
Overriding Checkpoint Data
You can modify context when resuming:
const resumed = await pipeline . runCheckpoint ({
checkpoint: checkpointStr ,
overrideData: { b: 10 } // Add/modify context values
})
Early Stopping
Pipelines can be stopped early using the stopPipeline callback:
const pipeline = stepkit <{ n : number }>()
. step ( 's1' , ({ n }) => ({ n: n + 1 }))
. step ( 's2' , ({ n }) => ({ n: n + 1 }))
. step ( 's3' , ({ n }) => ({ n: n + 1 }))
const result = await pipeline . run (
{ n: 0 },
{
onStepComplete : ( event ) => {
if ( event . stepName === 's2' ) {
event . stopPipeline () // Stop after s2
}
}
}
)
// result: { n: 2 } - s3 never executed
Pipeline Composition
Pipelines can be nested and composed:
const fetchUserBase = stepkit <{ userId : string }>()
. step ( 'fetch-data' , ({ userId }) => ({ plan: 'premium' }))
const premiumFlow = stepkit <{ userId : string ; plan : string }>()
. step ( 'fetch-premium-features' , () => ({ features: [ 'a' , 'b' ] }))
// Use sub-pipeline in a step
const fullPipeline = fetchUserBase
. step ( 'premium-logic' , premiumFlow )
. step ( 'finalize' , ({ features }) => ({ done: true }))
When composing pipelines, only the new keys added by the sub-pipeline are merged into the parent context. Existing keys are not overwritten.
Best Practices
Use Descriptive Step Names
Name your steps clearly to make logs and debugging easier: // Good
. step ( 'fetch-user-profile' , ... )
. step ( 'validate-permissions' , ... )
// Avoid
. step (() => ... ) // Anonymous
. step ( 'step1' , ... ) // Generic
Enable Logging During Development
Turn on logging to understand pipeline flow: const pipeline = stepkit ({ log: true })
Use Checkpoints for Long-Running Pipelines
Save checkpoints to enable recovery from failures: await pipeline . run ( input , {
onStepComplete : async ( event ) => {
await db . saveCheckpoint ( event . stepName , event . checkpoint )
}
})