Understanding Context
Context is the data that flows through your pipeline. Each step receives the current context and can add new properties to it.
Basic Context Flow
Context starts with your input and grows as each step adds data:
const pipeline = stepkit <{ userId : string }>()
. step ( 'fetch-user' , ({ userId }) => {
return { userName: 'John Doe' }
})
. step ( 'fetch-settings' , ({ userId , userName }) => {
// Both userId and userName are available
return { theme: 'dark' }
})
. step ( 'finalize' , ({ userId , userName , theme }) => {
// All previous data is available
return { done: true }
})
await pipeline . run ({ userId: '123' })
// Final context: { userId: '123', userName: 'John Doe', theme: 'dark', done: true }
Context is immutable between steps. Each step works with a fresh clone of the context to prevent unexpected mutations.
Context Merging
By default, step outputs are merged into the existing context:
const pipeline = stepkit <{ a : number }>()
. step ( 'step1' , ({ a }) => ({ b: 2 })) // Context: { a: 1, b: 2 }
. step ( 'step2' , ({ a , b }) => ({ c: 3 })) // Context: { a: 1, b: 2, c: 3 }
. step ( 'step3' , ({ a , b , c }) => ({ d: 4 })) // Context: { a: 1, b: 2, c: 3, d: 4 }
await pipeline . run ({ a: 1 })
Merge Policies
When a step returns a key that already exists, you can control the behavior with merge policies:
type MergePolicy = 'override' | 'error' | 'warn' | 'skip'
Override (Default)
Overwrite existing values:
const pipeline = stepkit <{ value : number }>()
. step ( 'first' , () => ({ count: 1 }))
. step (
{ name: 'second' , mergePolicy: 'override' },
() => ({ count: 2 }) // Overwrites count = 1
)
await pipeline . run ({ value: 10 })
// Result: { value: 10, count: 2 }
Error
Throw an error on collision:
const pipeline = stepkit <{ value : number }>()
. step ( 'first' , () => ({ count: 1 }))
. step (
{ name: 'second' , mergePolicy: 'error' },
() => ({ count: 2 }) // Throws: "Context key collision: 'count' already exists"
)
Warn
Log a warning but overwrite:
const pipeline = stepkit ({ log: true })
. step ( 'first' , () => ({ count: 1 }))
. step (
{ name: 'second' , mergePolicy: 'warn' },
() => ({ count: 2 }) // Logs warning, then overwrites
)
// Output: ⚠️ Key collision on 'count'
Skip
Keep existing value, ignore new value:
const pipeline = stepkit <{ value : number }>()
. step ( 'first' , () => ({ count: 1 }))
. step (
{ name: 'second' , mergePolicy: 'skip' },
() => ({ count: 2 }) // Ignored, count remains 1
)
await pipeline . run ({ value: 10 })
// Result: { value: 10, count: 1 }
Context Transformation
Unlike regular steps, transform() replaces the entire context instead of merging:
const pipeline = stepkit <{ rawData : string }>()
. step ( 'parse' , ({ rawData }) => {
const parsed = JSON . parse ( rawData )
return { parsed , metadata: 'extra' }
})
. transform ( 'reshape' , ({ parsed }) => ({
// Replace entire context
id: parsed . id ,
name: parsed . name
}))
. step ( 'process' , ({ id , name }) => {
// Only id and name exist - rawData, parsed, metadata are gone
return { processed: true }
})
When Context is Replaced
const result = await stepkit <{ a : number }>()
. step ( 'step1' , () => ({ b: 2 , c: 3 })) // Context: { a: 1, b: 2, c: 3 }
. transform ( 'reshape' , ({ b }) => ({ b: b * 2 })) // Context: { b: 4 }
. step ( 'step2' , ({ b }) => ({ d: 5 })) // Context: { b: 4, d: 5 }
. run ({ a: 1 })
// Result: { b: 4, d: 5 }
// Note: 'a' and 'c' are gone after transform
Conditional Context
When steps have conditions, their outputs become optional in the type system:
const pipeline = stepkit <{ isPremium : boolean }>()
. step ( 'init' , () => ({ base: 'data' }))
. step (
{
name: 'premium-step' ,
condition : ({ isPremium }) => isPremium
},
() => ({ premium: 'feature' })
)
. step ( 'final' , ({ premium }) => {
// TypeScript knows premium might be undefined
return { hasPremium: !! premium }
})
Type Safety with Optional Context
type Context1 = { base : string } // After init
type Context2 = { base : string , premium ?: string } // After conditional step
type Context3 = { base : string , premium ?: string , hasPremium : boolean } // Final
Error Handling and Context
When onError is set to 'continue' or 'skip-remaining', outputs become optional:
const pipeline = stepkit <{ shouldFail : boolean }>()
. step ( 'step1' , () => ({ value: 1 }))
. step (
{ name: 'risky-step' , onError: 'continue' },
({ shouldFail }) => {
if ( shouldFail ) throw new Error ( 'Failed' )
return { result: 'success' }
}
)
. step ( 'step3' , ({ result }) => {
// result is optional (might not exist if step2 failed)
return { final: result ?? 'no result' }
})
await pipeline . run ({ shouldFail: true })
// Result: { shouldFail: true, value: 1, final: 'no result' }
Context in Parallel Steps
When running multiple functions in parallel, their outputs are merged:
const pipeline = stepkit <{ userId : string }>()
. step (
'fetch-all' ,
({ userId }) => ({ userName: 'John' }),
({ userId }) => ({ email: '[email protected] ' }),
({ userId }) => ({ avatar: 'url' })
)
// All outputs merged: { userId: '123', userName: 'John', email: '...', avatar: 'url' }
Parallel Merge Conflicts
If parallel functions return the same key, merge policy applies:
const pipeline = stepkit <{ id : string }>()
. step (
{ name: 'parallel' , mergePolicy: 'override' },
() => ({ count: 1 }),
() => ({ count: 2 }), // Last one wins with override
() => ({ count: 3 })
)
// Result: { id: '123', count: 3 }
Branch Context Flow
Branches receive the current context and merge back only new keys :
const pipeline = stepkit <{ userId : string }>()
. step ( 'fetch-data' , ({ userId }) => {
return { plan: 'premium' , credits: 150 }
})
. branchOn (
{
when : ({ plan }) => plan === 'premium' ,
then : ( builder ) =>
builder
. step ( 'premium-user' , ({ userId }) => ({ userName: 'Jane' }))
. step ( 'premium-features' , () => ({ features: [ 'a' , 'b' ] }))
},
{
default : ( builder ) =>
builder . step ( 'basic-user' , ({ userId }) => ({ userName: 'John' }))
}
)
. step ( 'after-branch' , ({ userName , features }) => {
return { processed: true }
})
await pipeline . run ({ userId: '123' })
// Result: { userId: '123', plan: 'premium', credits: 150, userName: 'Jane', features: ['a', 'b'], processed: true }
Context Patch Computation
Only new or changed keys from branches are merged back:
// Context before branch: { userId: '123', plan: 'premium' }
// Branch adds: { userName: 'Jane', features: ['a', 'b'] }
// Context after branch: { userId: '123', plan: 'premium', userName: 'Jane', features: ['a', 'b'] }
From source code (builder.ts:716-719):
const subContext = await built . runWithRuntime ( context , nestedRuntime )
return computePatch (
context as unknown as Record < string , unknown >,
subContext as unknown as Record < string , unknown >
)
The computePatch utility (utils.ts:59-74) compares before and after:
export const computePatch = (
base : Record < string , unknown >,
next : Record < string , unknown >
) : Record < string , unknown > => {
const patch : Record < string , unknown > = {}
const nextEntries = Object . entries ( next )
for ( const [ key , value ] of nextEntries ) {
if ( ! Object . prototype . hasOwnProperty . call ( base , key )) {
patch [ key ] = value
continue
}
const prev = ( base as any )[ key ]
if ( ! deepEqual ( prev , value )) patch [ key ] = value
}
return patch
}
Nested Pipeline Context
When using pipelines as steps, the same patch logic applies:
const subPipeline = stepkit <{ userId : string }>()
. step ( 'fetch' , ({ userId }) => ({ name: 'John' }))
. step ( 'fetch-email' , () => ({ email: '[email protected] ' }))
const mainPipeline = stepkit <{ userId : string }>()
. step ( 'init' , () => ({ timestamp: Date . now () }))
. step ( 'fetch-user' , subPipeline ) // Adds name and email
. step ( 'finalize' , ({ name , email , timestamp }) => ({ done: true }))
await mainPipeline . run ({ userId: '123' })
// Result: { userId: '123', timestamp: 1234567890, name: 'John', email: '[email protected] ', done: true }
Deep Cloning
Stepkit deep clones context between steps to prevent mutations:
const pipeline = stepkit <{ data : { count : number } }>()
. step ( 'mutate-attempt' , ({ data }) => {
data . count = 999 // This mutation is isolated
return { modified: true }
})
. step ( 'check' , ({ data }) => {
console . log ( data . count ) // Still original value
return {}
})
From source code (utils.ts:7-15):
export const deepClone = < T >( value : T ) : T => {
if ( value === null || typeof value !== 'object' ) return value
if ( Array . isArray ( value )) return value . map (( v ) => deepClone ( v )) as unknown as T
if ( value instanceof Date ) return new Date ( value . getTime ()) as unknown as T
if ( ! isPlainObject ( value )) return value
const result : Record < string , unknown > = {}
for ( const [ k , v ] of Object . entries ( value as Record < string , unknown >)) result [ k ] = deepClone ( v )
return result as unknown as T
}
Best Practices
Context is cloned between steps, so mutations don’t persist: // Bad
. step ( 'mutate' , ({ data }) => {
data . count ++ // Won't affect next step
})
// Good
. step ( 'increment' , ({ data }) => ({
data: { ... data , count: data . count + 1 }
}))
Use Transform for Clean Breaks
When steps are conditional or have error handlers, handle undefined: . step ( 'use-optional' , ({ maybeValue }) => ({
result: maybeValue ?? 'default'
}))
Choose the Right Merge Policy
Use override for most cases
Use error during development to catch conflicts
Use skip when earlier values have priority