Skip to main content

Overview

Middleware builders provide a fluent interface for creating reusable logic that wraps task execution or resource initialization. Middleware can modify inputs, handle errors, add cross-cutting concerns like retry, timeout, caching, and more.
import { r } from "@bluelibs/runner";

// Task middleware
const loggingMiddleware = r.middleware.task("app.middleware.logging")
  .run(async ({ task, input, next, logger }) => {
    await logger.info(`Starting task: ${task.id}`);
    const result = await next();
    await logger.info(`Task completed: ${task.id}`);
    return result;
  })
  .build();

// Resource middleware
const timingMiddleware = r.middleware.resource("app.middleware.timing")
  .run(async ({ resource, next, logger }) => {
    const start = Date.now();
    const result = await next();
    await logger.info(`${resource.id} initialized in ${Date.now() - start}ms`);
    return result;
  })
  .build();

Task Middleware

Task middleware wraps task execution and can modify inputs, outputs, or behavior.

Methods

configSchema() / schema()

Defines the validation schema for middleware configuration.
.configSchema<TConfig>(schema: IValidationSchema<TConfig>)
.schema<TConfig>(schema: IValidationSchema<TConfig>) // Alias
schema
IValidationSchema<TConfig>
required
Validation schema for middleware configuration.
.configSchema<{ maxRetries: number; delayMs: number }>({
  parse: (config) => {
    if (config.maxRetries < 1) {
      throw new Error('maxRetries must be at least 1');
    }
    return config;
  }
})
Returns: New builder with updated config type

run()

Defines the middleware execution logic.
.run(
  fn: (context: TaskMiddlewareContext<TConfig, TInput, TOutput, TDeps>) => Promise<TOutput>
)
fn
Function
required
The middleware function. Receives a context object with:
  • task - The task definition being executed
  • input - The validated task input
  • config - The middleware configuration
  • deps - Resolved middleware dependencies
  • runtime - Runtime instance
  • logger - Scoped logger
  • journal - Execution journal for middleware communication
  • next() - Function to call the next middleware or task
.run(async ({ task, input, config, next, logger, journal }) => {
  await logger.debug(`Executing ${task.id}`);
  
  try {
    const result = await next();
    return result;
  } catch (error) {
    await logger.error(`Task failed: ${error.message}`);
    throw error;
  }
})
Returns: New builder with run function set (required before build())

dependencies()

Defines dependencies to inject into the middleware.
.dependencies<TDeps>(
  deps: TDeps | ((config: TConfig) => TDeps),
  options?: { override?: boolean }
)
deps
DependencyMap | ((config) => DependencyMap)
required
Object mapping dependency keys to resources.
.dependencies((config) => ({
  cache: config.enableCache ? cacheResource : null,
  logger: globals.resources.logger,
}))
Returns: New builder with updated dependencies type

everywhere()

Configures whether this middleware applies to all tasks automatically.
.everywhere(flag: boolean | ((task: ITask) => boolean))
flag
boolean | Function
required
When true, applies to all tasks. Can also be a predicate function to filter tasks.
// Apply to all tasks
.everywhere(true)

// Apply to tasks with specific tag
.everywhere((task) => task.tags.includes(apiTag))
Returns: New builder with everywhere setting

tags(), meta(), throws()

Same as task builder. See Task Builder for details.

build()

Builds and returns the final middleware definition.
.build(): ITaskMiddleware<TConfig, TInput, TOutput, TDeps>
Returns: Immutable task middleware definition

Task Middleware Type Signature

interface TaskMiddlewareFluentBuilder<
  C = any,
  In = void,
  Out = void,
  D extends DependencyMapType = {}
> {
  id: string;
  configSchema<TNew>(schema: IValidationSchema<TNew>): TaskMiddlewareFluentBuilder<TNew, In, Out, D>;
  schema<TNew>(schema: IValidationSchema<TNew>): TaskMiddlewareFluentBuilder<TNew, In, Out, D>;
  dependencies<TNewDeps>(
    deps: TNewDeps | ((config: C) => TNewDeps),
    options?: { override?: boolean }
  ): TaskMiddlewareFluentBuilder<C, In, Out, D & TNewDeps>;
  run(fn: ITaskMiddlewareDefinition<C, In, Out, D>["run"]): TaskMiddlewareFluentBuilder<C, In, Out, D>;
  meta<TNewMeta extends IMiddlewareMeta>(m: TNewMeta): TaskMiddlewareFluentBuilder<C, In, Out, D>;
  tags<TNewTags extends TagType[]>(t: TNewTags, options?: { override?: boolean }): TaskMiddlewareFluentBuilder<C, In, Out, D>;
  throws(list: ThrowsList): TaskMiddlewareFluentBuilder<C, In, Out, D>;
  everywhere(flag: boolean | ((task: ITask) => boolean)): TaskMiddlewareFluentBuilder<C, In, Out, D>;
  build(): ITaskMiddleware<C, In, Out, D>;
}

Task Middleware Examples

Simple Logging Middleware

const logMiddleware = r.middleware.task("app.middleware.log")
  .run(async ({ task, input, next, logger }) => {
    await logger.info(`[${task.id}] Starting`, { input });
    const result = await next();
    await logger.info(`[${task.id}] Completed`, { result });
    return result;
  })
  .build();

Retry Middleware with Config

const retryMiddleware = r.middleware.task("app.middleware.retry")
  .configSchema<{ maxAttempts: number; delayMs: number }>({
    parse: (v) => v
  })
  .run(async ({ config, next, logger }) => {
    let lastError;
    
    for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
      try {
        return await next();
      } catch (error) {
        lastError = error;
        await logger.warn(`Attempt ${attempt} failed, retrying...`);
        if (attempt < config.maxAttempts) {
          await sleep(config.delayMs);
        }
      }
    }
    
    throw lastError;
  })
  .build();

// Use it:
const task = r.task("myTask")
  .middleware([retryMiddleware.with({ maxAttempts: 3, delayMs: 1000 })])
  .run(async () => { /* ... */ })
  .build();

Timeout Middleware

const timeoutMiddleware = r.middleware.task("app.middleware.timeout")
  .configSchema<{ ms: number }>({ parse: (v) => v })
  .run(async ({ config, next, journal }) => {
    const controller = new AbortController();
    const timeoutId = setTimeout(() => controller.abort(), config.ms);
    
    // Store abort controller in journal for task to use
    journal.set(abortControllerKey, controller);
    
    try {
      const result = await next();
      clearTimeout(timeoutId);
      return result;
    } catch (error) {
      clearTimeout(timeoutId);
      if (controller.signal.aborted) {
        throw new TimeoutError(`Task exceeded ${config.ms}ms`);
      }
      throw error;
    }
  })
  .build();

Middleware with Dependencies

const cacheMiddleware = r.middleware.task("app.middleware.cache")
  .configSchema<{ ttl: number }>({ parse: (v) => v })
  .dependencies(() => ({
    cacheService: cacheResource,
  }))
  .run(async ({ task, input, config, deps, next, logger }) => {
    const cacheKey = `${task.id}:${JSON.stringify(input)}`;
    
    // Check cache
    const cached = await deps.cacheService.get(cacheKey);
    if (cached) {
      await logger.debug('Cache hit');
      return cached;
    }
    
    // Execute task
    const result = await next();
    
    // Store in cache
    await deps.cacheService.set(cacheKey, result, config.ttl);
    return result;
  })
  .build();

Middleware Applied Everywhere

const metricsMiddleware = r.middleware.task("app.middleware.metrics")
  .dependencies(() => ({ metrics: metricsResource }))
  .everywhere(true) // Apply to all tasks
  .run(async ({ task, deps, next }) => {
    const start = Date.now();
    try {
      const result = await next();
      deps.metrics.recordSuccess(task.id, Date.now() - start);
      return result;
    } catch (error) {
      deps.metrics.recordFailure(task.id, Date.now() - start);
      throw error;
    }
  })
  .build();

Resource Middleware

Resource middleware wraps resource initialization and disposal.

Methods

Resource middleware has similar methods to task middleware:
  • configSchema() / schema()
  • run()
  • dependencies()
  • everywhere()
  • tags(), meta(), throws()
  • build()
The key difference is in the run() context:
.run(async ({ resource, config, deps, runtime, logger, next }) => {
  // `next()` initializes the resource
  const value = await next();
  return value;
})

Resource Middleware Type Signature

interface ResourceMiddlewareFluentBuilder<
  C = any,
  In = void,
  Out = void,
  D extends DependencyMapType = {}
> {
  id: string;
  configSchema<TNew>(schema: IValidationSchema<TNew>): ResourceMiddlewareFluentBuilder<TNew, In, Out, D>;
  schema<TNew>(schema: IValidationSchema<TNew>): ResourceMiddlewareFluentBuilder<TNew, In, Out, D>;
  dependencies<TNewDeps>(
    deps: TNewDeps | ((config: C) => TNewDeps),
    options?: { override?: boolean }
  ): ResourceMiddlewareFluentBuilder<C, In, Out, D & TNewDeps>;
  run(fn: IResourceMiddlewareDefinition<C, In, Out, D>["run"]): ResourceMiddlewareFluentBuilder<C, In, Out, D>;
  meta<TNewMeta extends IMiddlewareMeta>(m: TNewMeta): ResourceMiddlewareFluentBuilder<C, In, Out, D>;
  tags<TNewTags extends TagType[]>(t: TNewTags, options?: { override?: boolean }): ResourceMiddlewareFluentBuilder<C, In, Out, D>;
  throws(list: ThrowsList): ResourceMiddlewareFluentBuilder<C, In, Out, D>;
  everywhere(flag: boolean | ((resource: IResource) => boolean)): ResourceMiddlewareFluentBuilder<C, In, Out, D>;
  build(): IResourceMiddleware<C, In, Out, D>;
}

Resource Middleware Examples

Timing Middleware

const timingMiddleware = r.middleware.resource("app.middleware.timing")
  .run(async ({ resource, next, logger }) => {
    const start = Date.now();
    await logger.info(`Initializing ${resource.id}...`);
    const value = await next();
    await logger.info(`${resource.id} initialized in ${Date.now() - start}ms`);
    return value;
  })
  .build();

Retry Resource Initialization

const retryResourceMiddleware = r.middleware.resource("app.middleware.retry")
  .configSchema<{ maxAttempts: number }>({ parse: (v) => v })
  .run(async ({ config, next, logger }) => {
    let lastError;
    for (let i = 1; i <= config.maxAttempts; i++) {
      try {
        return await next();
      } catch (error) {
        lastError = error;
        await logger.warn(`Init attempt ${i} failed`);
        if (i < config.maxAttempts) {
          await sleep(1000 * i); // Exponential backoff
        }
      }
    }
    throw lastError;
  })
  .build();

Health Check Middleware

const healthCheckMiddleware = r.middleware.resource("app.middleware.health")
  .run(async ({ resource, next, logger }) => {
    const value = await next();
    
    // Verify resource health after init
    if (typeof value?.healthCheck === 'function') {
      const healthy = await value.healthCheck();
      if (!healthy) {
        throw new Error(`Resource ${resource.id} failed health check`);
      }
      await logger.info(`${resource.id} passed health check`);
    }
    
    return value;
  })
  .build();

Using Middleware

Attach to Specific Task/Resource

const myTask = r.task("myTask")
  .middleware([
    loggingMiddleware,
    retryMiddleware.with({ maxAttempts: 3, delayMs: 1000 }),
    timeoutMiddleware.with({ ms: 5000 }),
  ])
  .run(async () => { /* ... */ })
  .build();

Using Global Middleware

import { globals } from "@bluelibs/runner";

const task = r.task("myTask")
  .middleware([
    globals.middleware.task.retry.with({ maxAttempts: 3 }),
    globals.middleware.task.timeout.with({ ms: 5000 }),
    globals.middleware.task.cache.with({ ttl: 60000 }),
  ])
  .build();

Middleware Order

Middleware executes in order from first to last:
.middleware([
  middleware1, // Outer
  middleware2, // Middle
  middleware3, // Inner
])

// Execution flow:
// middleware1 before → middleware2 before → middleware3 before → task → middleware3 after → middleware2 after → middleware1 after

Build docs developers (and LLMs) love