Skip to main content

What are Pipelines?

A pipeline in stepkit is a sequence of steps that process and transform data. Each pipeline is built using the stepkit() builder function and executed with the .run() method.

Creating a Pipeline

Pipelines are created by calling stepkit() with an optional initial input type:
import { stepkit } from 'stepkit'

// Pipeline with typed input
const pipeline = stepkit<{ userId: string }>()
  .step('fetch-user', ({ userId }) => {
    return { userName: 'John Doe' }
  })
  .step('fetch-settings', ({ userName }) => {
    return { theme: 'dark', language: 'en' }
  })

// Execute the pipeline
const result = await pipeline.run({ userId: '123' })
// result: { userId: '123', userName: 'John Doe', theme: 'dark', language: 'en' }

Pipeline Configuration

You can configure pipeline behavior globally using PipelineConfig:
import type { PipelineConfig } from 'stepkit'

const config: PipelineConfig = {
  log: true,  // Enable logging
  onStepComplete: (event) => {
    console.log(`Step ${event.stepName} completed in ${event.duration}ms`)
  },
  onError: (stepName, error) => {
    console.error(`Step ${stepName} failed:`, error)
  },
  signal: abortController.signal  // Support for cancellation
}

const pipeline = stepkit(config)

Configuration Options

log
boolean | LogConfig
Enable logging and performance tracking. Can be a boolean or a detailed LogConfig object.
onStepComplete
(event: StepCompleteEvent) => void | Promise<void>
Callback fired after each step completes successfully. Receives event with step name, duration, context, and checkpoint data.
onError
(stepName: string, error: Error) => void
Global error handler for pipeline failures.
signal
AbortSignal
AbortSignal for cancelling pipeline execution.

Logging and Performance

Stepkit provides built-in logging and performance tracking:
const pipeline = stepkit<{ value: number }>({
  log: {
    logFn: console.log,
    errorLogFn: console.error,
    stopwatch: {
      showStepDuration: true,
      showSummary: true,
      showTotal: true
    }
  }
})
  .step('double', ({ value }) => ({ doubled: value * 2 }))
  .step('add-ten', ({ doubled }) => ({ result: doubled + 10 }))

await pipeline.run({ value: 5 })
This outputs:
🚀 Starting pipeline with input: { value: 5 }

📍 Step: double
✅ double completed in 2ms
   Output: doubled

📍 Step: add-ten
✅ add-ten completed in 1ms
   Output: result

⏱️  Performance Summary:
┌──────────────────────────────────────────────────────────┐
│ ✅ double                                            2ms │
│ ✅ add-ten                                           1ms │
└──────────────────────────────────────────────────────────┘

📊 Statistics:
   Average: 1.5ms
   Slowest: double (2ms)
   Fastest: add-ten (1ms)

⏰ Total Pipeline Time: 3ms

✨ Pipeline completed successfully

Running Pipelines

Basic Execution

const result = await pipeline.run({ userId: '123' })

Runtime Options

You can override configuration at runtime:
const result = await pipeline.run(
  { userId: '123' },
  {
    log: false,  // Disable logging for this run
    onStepComplete: (event) => {
      // Save checkpoint to database
      saveCheckpoint(event.checkpoint)
    }
  }
)

Checkpoints and Resumption

Stepkit supports checkpointing to resume pipelines from any step:
let checkpoint: string

const pipeline = stepkit<{ a: number }>()
  .step('add-one', ({ a }) => ({ a: a + 1 }))
  .step('double', ({ a }) => ({ a: a * 2 }))
  .step('finish', ({ a }) => ({ sum: a + 10 }))

// Run and capture checkpoint
await pipeline.run(
  { a: 1 },
  {
    onStepComplete: (event) => {
      if (event.stepName === 'double') {
        checkpoint = event.checkpoint
      }
    }
  }
)

// Resume from checkpoint
const resumed = await pipeline.runCheckpoint(checkpoint)

Checkpoint Structure

Checkpoints are serializable JSON strings containing:
type Checkpoint<T> = {
  stepName: string  // Name of the step that completed
  output: T         // Context at that point in the pipeline
}

Overriding Checkpoint Data

You can modify context when resuming:
const resumed = await pipeline.runCheckpoint({
  checkpoint: checkpointStr,
  overrideData: { b: 10 }  // Add/modify context values
})

Early Stopping

Pipelines can be stopped early using the stopPipeline callback:
const pipeline = stepkit<{ n: number }>()
  .step('s1', ({ n }) => ({ n: n + 1 }))
  .step('s2', ({ n }) => ({ n: n + 1 }))
  .step('s3', ({ n }) => ({ n: n + 1 }))

const result = await pipeline.run(
  { n: 0 },
  {
    onStepComplete: (event) => {
      if (event.stepName === 's2') {
        event.stopPipeline()  // Stop after s2
      }
    }
  }
)

// result: { n: 2 } - s3 never executed

Pipeline Composition

Pipelines can be nested and composed:
const fetchUserBase = stepkit<{ userId: string }>()
  .step('fetch-data', ({ userId }) => ({ plan: 'premium' }))

const premiumFlow = stepkit<{ userId: string; plan: string }>()
  .step('fetch-premium-features', () => ({ features: ['a', 'b'] }))

// Use sub-pipeline in a step
const fullPipeline = fetchUserBase
  .step('premium-logic', premiumFlow)
  .step('finalize', ({ features }) => ({ done: true }))
When composing pipelines, only the new keys added by the sub-pipeline are merged into the parent context. Existing keys are not overwritten.

Best Practices

Name your steps clearly to make logs and debugging easier:
// Good
.step('fetch-user-profile', ...)
.step('validate-permissions', ...)

// Avoid
.step(() => ...) // Anonymous
.step('step1', ...) // Generic
Turn on logging to understand pipeline flow:
const pipeline = stepkit({ log: true })
Save checkpoints to enable recovery from failures:
await pipeline.run(input, {
  onStepComplete: async (event) => {
    await db.saveCheckpoint(event.stepName, event.checkpoint)
  }
})
Always provide an input type for better type safety:
// Good
const pipeline = stepkit<{ userId: string }>()

// Less safe
const pipeline = stepkit()

Build docs developers (and LLMs) love