Skip to main content

Overview

Stepkit allows you to execute multiple functions in parallel within a single step. This is useful for gathering data from multiple sources concurrently, improving pipeline performance.

Basic Parallel Execution

Pass multiple functions to a single .step() call to run them in parallel:
const pipeline = stepkit<{ idea: string }>()
  .step(
    'gather-market-signals',
    async ({ idea }) => ({ marketSize: await fetchMarketSize(idea) }),
    async ({ idea }) => ({ competitors: await fetchCompetitors(idea) }),
  )

await pipeline.run({ idea: 'AI-powered plant waterer' })
All functions receive the same input context and their outputs are merged into the pipeline context.

Parallel Modes

Control how parallel execution handles failures using the parallelMode configuration:
parallelMode
'all' | 'settled'
default:"'all'"
Determines how parallel step execution handles failures:
  • 'all': All parallel functions must succeed (uses Promise.all)
  • 'settled': Continue merging successful outputs even if some functions fail (uses Promise.allSettled)

Mode: all (default)

If any function fails, the entire step fails:
const pipeline = stepkit()
  .step(
    { name: 'fetch-data', parallelMode: 'all' }, // default behavior
    async () => ({ users: await fetchUsers() }),
    async () => ({ posts: await fetchPosts() }),
  )

Mode: settled

Continue merging successful outputs even if some functions fail:
const pipeline = stepkit()
  .step(
    { name: 'gather-signals', parallelMode: 'settled' },
    async ({ idea }) => ({ marketSize: await fetchMarketSize(idea) }),
    async ({ idea }) => ({ competitors: await fetchCompetitors(idea) }), // might fail
    async ({ idea }) => ({ trends: await fetchTrends(idea) }),
  )
  .step('evaluate', ({ marketSize, competitors, trends }) => {
    // competitors might be undefined if that parallel function failed
    return { score: calculateScore(marketSize, competitors ?? [], trends) }
  })
When using parallelMode: 'settled', downstream steps should handle potentially undefined values from failed parallel functions.

Merge Policies

Control how outputs are merged when parallel functions return overlapping keys:
mergePolicy
'override' | 'error' | 'warn' | 'skip'
default:"'override'"
Determines how to handle key collisions when merging step outputs:
  • 'override': Later values overwrite earlier ones (default)
  • 'error': Throw an error on key collision
  • 'warn': Log a warning and override
  • 'skip': Keep the first value, skip the duplicate
const pipeline = stepkit()
  .step(
    { name: 'fetch', mergePolicy: 'error' },
    async () => ({ data: 'from-source-1' }),
    async () => ({ data: 'from-source-2' }), // throws: key collision on 'data'
  )

Mixed Parallel and Sequential Execution

Combine parallel execution with sequential steps:
const pipeline = stepkit<{ userId: string }>()
  .step('fetch-user', async ({ userId }) => {
    return { user: await getUser(userId) }
  })
  // Run these in parallel after fetch-user completes
  .step(
    'fetch-data',
    async ({ user }) => ({ orders: await fetchOrders(user.id) }),
    async ({ user }) => ({ alerts: await fetchAlerts(user.id) }),
  )
  .step('process', ({ orders, alerts }) => {
    return { orderCount: orders.length, alertCount: alerts.length }
  })

Parallel Execution with Sub-Pipelines

You can also nest pipelines as parallel functions:
const fetchOrders = stepkit<{ userId: string }>()
  .step('get-orders', async ({ userId }) => ({ orders: await getOrders(userId) }))
  .step('enrich', async ({ orders }) => ({ orders: await enrichOrders(orders) }))

const fetchAlerts = stepkit<{ userId: string }>()
  .step('get-alerts', async ({ userId }) => ({ alerts: await getAlerts(userId) }))

const main = stepkit<{ userId: string }>()
  .step('fetch-user', async ({ userId }) => ({ user: await getUser(userId) }))
  .step(
    'load-data',
    fetchOrders,
    fetchAlerts,
  )

await main.run({ userId: '123' })

Type Safety

Parallel outputs are properly typed and merged:
const pipeline = stepkit<{ id: string }>()
  .step(
    'fetch',
    async ({ id }) => ({ name: 'John' }),
    async ({ id }) => ({ age: 30 }),
  )
  .step('use', ({ name, age }) => {
    // Both name and age are available and typed
    return { greeting: `${name} is ${age} years old` }
  })

type Output = StepOutput<typeof pipeline>
// { id: string; name: string; age: number; greeting: string }

Performance Considerations

  • Parallel execution uses Promise.all() or Promise.allSettled() under the hood
  • All parallel functions start simultaneously
  • The step completes when all functions finish (or when all settle in settled mode)
  • Use parallel execution for I/O-bound operations that can run independently
  • CPU-bound operations may not benefit from parallel execution in Node.js

Build docs developers (and LLMs) love