Skip to main content
The Workflow DevKit uses a sophisticated serialization system to transfer data across execution boundaries—from client to workflow, workflow to step, and back again. This page explores the serialization format, special type handling, and the reducer/reviver pattern.

Why Serialization Matters

Workflows execute in isolated environments:
  • Client code runs in your application (Node.js, Edge, etc.)
  • Workflow code runs in a deterministic VM sandbox
  • Step code runs in the full Node.js runtime
Data must be serialized when crossing these boundaries: Each boundary has different requirements and constraints.

Serialization Format

The Workflow DevKit uses a prefix-based format system that allows self-describing payloads:
// Format: [4-byte prefix][serialized payload]
export const SerializationFormat = {
  DEVALUE_V1: 'devl',
} as const;
Why prefixes?
  • Self-describing: The World layer doesn’t need to know the format in advance
  • Gradual migration: Old runs keep working when new formats are introduced
  • Composability: Encryption can wrap any format (e.g., “encr” wrapping “devl”)
  • Debugging: Raw data inspection immediately reveals the format
Encoding with format prefix:
export function encodeWithFormatPrefix(
  format: SerializationFormatType,
  payload: Uint8Array
): Uint8Array {
  const prefixBytes = formatEncoder.encode(format); // "devl" → [100, 101, 118, 108]
  const result = new Uint8Array(4 + payload.length);
  result.set(prefixBytes, 0);
  result.set(payload, 4);
  return result;
}
Decoding:
export function decodeFormatPrefix(data: Uint8Array): {
  format: SerializationFormatType;
  payload: Uint8Array;
} {
  const prefixBytes = data.subarray(0, 4);
  const format = formatDecoder.decode(prefixBytes); // [100, 101, 118, 108] → "devl"
  const payload = data.subarray(4);
  return { format, payload };
}

The Devalue Library

The current format (devl) uses the devalue library, which provides:
  • Circular reference support: Objects that reference themselves
  • Rich type support: Date, RegExp, Map, Set, typed arrays, etc.
  • Extensibility: Custom reducers/revivers for framework-specific types
  • Compact output: More efficient than JSON.stringify
Basic usage:
import { stringify, parse } from 'devalue';

const obj = { date: new Date(), set: new Set([1, 2, 3]) };
const serialized = stringify(obj);
const deserialized = parse(serialized);

Special Type Handling

The Workflow DevKit extends devalue to handle workflow-specific types:
export interface SerializableSpecial {
  ArrayBuffer: string; // base64
  BigInt: string;
  Date: string; // ISO string
  Error: Record<string, any>;
  Headers: [string, string][];
  Map: [any, any][];
  ReadableStream: { name: string; type?: 'bytes' };
  Request: { method: string; url: string; headers: Headers; body: ReadableStream };
  Response: { status: number; headers: Headers; body: ReadableStream };
  Set: any[];
  StepFunction: { stepId: string; closureVars?: Record<string, any> };
  URL: string;
  Uint8Array: string; // base64
  WritableStream: { name: string };
  // ... more types
}

Reducers and Revivers

The Workflow DevKit uses a reducer/reviver pattern to customize serialization:
  • Reducers: Convert complex objects to serializable forms during stringify()
  • Revivers: Reconstruct complex objects from serialized forms during parse()

Common Reducers

From serialization.ts, the getCommonReducers() function handles types used across all boundaries:
function getCommonReducers(global: Record<string, any> = globalThis) {
  return {
    // Convert ArrayBuffer to base64 string
    ArrayBuffer: (value) => {
      if (!(value instanceof global.ArrayBuffer)) return false;
      const uint8 = new Uint8Array(value);
      return Buffer.from(uint8).toString('base64');
    },
    
    // Convert BigInt to string representation
    BigInt: (value) => 
      typeof value === 'bigint' && value.toString(),
    
    // Convert Date to ISO string
    Date: (value) => {
      if (!(value instanceof global.Date)) return false;
      const valid = !Number.isNaN(value.getDate());
      return valid ? value.toISOString() : '.';
    },
    
    // Handle errors across VM boundaries
    Error: (value) => {
      if (!types.isNativeError(value)) return false;
      return {
        name: value.name,
        message: value.message,
        stack: value.stack,
      };
    },
    
    // Convert Map to array of entries
    Map: (value) => 
      value instanceof global.Map && Array.from(value),
    
    // More reducers...
  };
}
Why check instanceof global.X? Different execution contexts (VM vs host) have different constructor functions. We check against the global parameter to handle both contexts.

Common Revivers

Revivers reverse the transformation:
function getCommonRevivers(global: Record<string, any> = globalThis) {
  function reviveArrayBuffer(value: string) {
    const buffer = Buffer.from(value, 'base64');
    const arrayBuffer = new global.ArrayBuffer(buffer.length);
    const uint8Array = new global.Uint8Array(arrayBuffer);
    uint8Array.set(buffer);
    return arrayBuffer;
  }
  
  return {
    ArrayBuffer: reviveArrayBuffer,
    BigInt: (value: string) => global.BigInt(value),
    Date: (value) => new global.Date(value),
    Error: (value) => {
      const error = new global.Error(value.message);
      error.name = value.name;
      error.stack = value.stack;
      return error;
    },
    Map: (value) => new global.Map(value),
    // More revivers...
  };
}

Boundary-Specific Serialization

Different execution boundaries need different reducer/reviver sets:

External Boundary (Client ↔ Workflow)

From getExternalReducers() and getExternalRevivers(): Client → Workflow (arguments):
export function getExternalReducers(
  global: Record<string, any>,
  ops: Promise<void>[],
  runId: string
): Reducers {
  return {
    ...getCommonReducers(global),
    
    // Handle ReadableStream by piping to server storage
    ReadableStream: (value) => {
      if (!(value instanceof global.ReadableStream)) return false;
      
      const streamId = defaultUlid();
      const name = `strm_${streamId}`;
      const type = getStreamType(value); // 'bytes' or undefined
      
      // Pipe stream to server storage
      const writable = new WorkflowServerWritableStream(name, runId);
      if (type === 'bytes') {
        ops.push(value.pipeTo(writable));
      } else {
        ops.push(
          value
            .pipeThrough(getSerializeStream(...))
            .pipeTo(writable)
        );
      }
      
      return { name, type };
    },
  };
}
Workflow → Client (return value):
export function getExternalRevivers(
  global: Record<string, any>,
  ops: Promise<void>[],
  runId: string
): Revivers {
  return {
    ...getCommonRevivers(global),
    
    // Reconstruct ReadableStream from server storage
    ReadableStream: (value) => {
      const readable = new WorkflowServerReadableStream(value.name);
      
      if (value.type === 'bytes') {
        return readable; // Already bytes
      } else {
        // Deserialize objects from stream
        return readable.pipeThrough(
          getDeserializeStream(getExternalRevivers(...))
        );
      }
    },
  };
}

Workflow Boundary (Workflow ↔ Step)

From getWorkflowReducers() and getWorkflowRevivers(): Workflow → Step (arguments):
export function getWorkflowReducers(
  global: Record<string, any>
): Reducers {
  return {
    ...getCommonReducers(global),
    
    // Streams in workflow are just handles (names)
    ReadableStream: (value) => {
      if (!(value instanceof global.ReadableStream)) return false;
      
      const name = value[STREAM_NAME_SYMBOL];
      if (!name) {
        throw new Error('ReadableStream name is not set');
      }
      
      return { name, type: value[STREAM_TYPE_SYMBOL] };
    },
  };
}
Step → Workflow (return value): From getStepReducers(), steps can create new streams:
function getStepReducers(
  global: Record<string, any>,
  ops: Promise<void>[],
  runId: string
): Reducers {
  return {
    ...getCommonReducers(global),
    
    ReadableStream: (value) => {
      if (!(value instanceof global.ReadableStream)) return false;
      
      // Check if already has a name (already being piped to server)
      let name = value[STREAM_NAME_SYMBOL];
      
      if (!name) {
        // New stream - create storage and pipe to it
        const streamId = defaultUlid();
        name = `strm_${streamId}`;
        const type = getStreamType(value);
        
        const writable = new WorkflowServerWritableStream(name, runId);
        if (type === 'bytes') {
          ops.push(value.pipeTo(writable));
        } else {
          ops.push(
            value
              .pipeThrough(getSerializeStream(...))
              .pipeTo(writable)
          );
        }
      }
      
      return { name, type };
    },
  };
}

Step Function Serialization

Step functions can be passed as arguments and serialized: Reducer:
StepFunction: (value) => {
  if (typeof value !== 'function') return false;
  
  const stepId = (value as any).stepId;
  if (typeof stepId !== 'string') return false;
  
  // Check for closure variables
  const closureVarsFn = (value as any).__closureVarsFn;
  if (closureVarsFn && typeof closureVarsFn === 'function') {
    const closureVars = closureVarsFn();
    return { stepId, closureVars };
  }
  
  return { stepId };
},
Reviver (in workflow context):
StepFunction: (value) => {
  const stepId = value.stepId;
  const closureVars = value.closureVars;
  
  if (!useStep) {
    throw new Error('WORKFLOW_USE_STEP not found on global object');
  }
  
  if (closureVars) {
    // Create wrapper that provides closure variables
    return useStep(stepId, () => closureVars);
  }
  
  return useStep(stepId);
},
Reviver (in step context):
StepFunction: (value) => {
  const stepId = value.stepId;
  const closureVars = value.closureVars;
  
  const stepFn = getStepFunction(stepId);
  if (!stepFn) {
    throw new Error(`Step function "${stepId}" not found`);
  }
  
  if (closureVars) {
    // Wrap to provide closure variables via AsyncLocalStorage
    return ((...args: any[]) => {
      const currentContext = contextStorage.getStore();
      const newContext = { ...currentContext, closureVars };
      return contextStorage.run(newContext, () => stepFn(...args));
    });
  }
  
  return stepFn;
},

Stream Serialization

Streams are serialized by piping data to server storage and passing stream names across boundaries:

Server-Side Stream Storage

WorkflowServerWritableStream:
export class WorkflowServerWritableStream extends WritableStream<Uint8Array> {
  constructor(name: string, runId: string) {
    const world = getWorld();
    let buffer: Uint8Array[] = [];
    
    const flush = async () => {
      if (buffer.length === 0) return;
      
      const chunksToFlush = buffer.slice();
      
      // Batch write for efficiency
      if (world.writeToStreamMulti && chunksToFlush.length > 1) {
        await world.writeToStreamMulti(name, runId, chunksToFlush);
      } else {
        for (const chunk of chunksToFlush) {
          await world.writeToStream(name, runId, chunk);
        }
      }
      
      buffer = [];
    };
    
    super({
      async write(chunk) {
        buffer.push(chunk);
        scheduleFlush(); // Flush after 10ms
      },
      async close() {
        await flush();
        await world.closeStream(name, runId);
      },
    });
  }
}
WorkflowServerReadableStream:
export class WorkflowServerReadableStream extends ReadableStream<Uint8Array> {
  constructor(name: string, startIndex?: number) {
    let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
    
    super({
      type: 'bytes',
      pull: async (controller) => {
        if (!reader) {
          const world = getWorld();
          const stream = await world.readFromStream(name, startIndex);
          reader = stream.getReader();
        }
        
        const result = await reader.read();
        if (result.done) {
          controller.close();
        } else {
          controller.enqueue(result.value);
        }
      },
    });
  }
}

Stream Framing

For non-byte streams (object streams), chunks are framed with length prefixes:
// Frame format: [4-byte big-endian length][format-prefixed payload]
const FRAME_HEADER_SIZE = 4;

export function getSerializeStream(): TransformStream<any, Uint8Array> {
  return new TransformStream({
    transform(chunk, controller) {
      const serialized = stringify(chunk, reducers);
      const payload = encoder.encode(serialized);
      const prefixed = encodeWithFormatPrefix('devl', payload);
      
      // Write length-prefixed frame
      const frame = new Uint8Array(4 + prefixed.length);
      new DataView(frame.buffer).setUint32(0, prefixed.length, false);
      frame.set(prefixed, 4);
      controller.enqueue(frame);
    },
  });
}
Why framing? Allows the deserializer to find chunk boundaries even when multiple chunks are concatenated or split across transport reads.

Request/Response in VM

The workflow VM provides stub implementations of Request and Response that defer body parsing to steps:
class Response {
  constructor(body?: any, init?: ResponseInit) {
    // Store the original BodyInit for serialization
    if (body !== null && body !== undefined) {
      // Create a "fake" ReadableStream that stores the original body
      this.body = Object.create(vmGlobalThis.ReadableStream.prototype, {
        [BODY_INIT_SYMBOL]: {
          value: body,
          writable: false,
        },
      });
    } else {
      this.body = null;
    }
  }
  
  async json() {
    return resJson(this); // Step function
  }
  
  async text() {
    return resText(this); // Step function
  }
}
Why fake streams? Parsing response bodies is async work that would break determinism during replay. By storing the raw BodyInit and deferring parsing to a step, we ensure consistent replay behavior.

Error Handling

Serialization errors are wrapped with helpful context:
function formatSerializationError(context: string, error: unknown): string {
  let message = `Failed to serialize ${context}`;
  
  if (error instanceof DevalueError && error.path) {
    message += ` at path "${error.path}"`;
  }
  
  message += '. Ensure you\'re passing serializable types.';
  
  if (error instanceof DevalueError && error.value !== undefined) {
    runtimeLogger.error('Serialization failed', {
      context,
      problematicValue: error.value,
    });
  }
  
  return message;
}
Example error:
Failed to serialize workflow arguments at path "user.avatar". 
Ensure you're passing serializable types (plain objects, arrays, 
primitives, Date, RegExp, Map, Set).

Custom Class Serialization

Classes can implement custom serialization using special symbols:
import { WORKFLOW_SERIALIZE, WORKFLOW_DESERIALIZE } from '@workflow/serde';

class User {
  static classId = 'User';
  
  constructor(public name: string, public email: string) {}
  
  static [WORKFLOW_SERIALIZE](instance: User) {
    return { name: instance.name, email: instance.email };
  }
  
  static [WORKFLOW_DESERIALIZE](data: any) {
    return new User(data.name, data.email);
  }
}
Reducer:
Instance: (value) => {
  if (value === null || typeof value !== 'object') return false;
  
  const cls = value.constructor;
  const serialize = cls[WORKFLOW_SERIALIZE];
  const classId = cls.classId;
  
  if (typeof serialize !== 'function' || typeof classId !== 'string') {
    return false;
  }
  
  const data = serialize.call(cls, value);
  return { classId, data };
},
Reviver:
Instance: (value) => {
  const cls = getSerializationClass(value.classId, global);
  if (!cls) {
    throw new Error(`Class "${value.classId}" not found`);
  }
  
  const deserialize = cls[WORKFLOW_DESERIALIZE];
  return deserialize.call(cls, value.data);
},

Performance Considerations

Binary format: Using Uint8Array with format prefixes reduces string encoding overhead compared to JSON. Stream buffering: The WorkflowServerWritableStream batches writes every 10ms to reduce database round-trips:
const STREAM_FLUSH_INTERVAL_MS = 10;

const scheduleFlush = () => {
  if (flushTimer) return;
  flushTimer = setTimeout(() => {
    flushPromise = flush();
  }, STREAM_FLUSH_INTERVAL_MS);
};
Batch operations: When multiple chunks are buffered, writeToStreamMulti sends them in a single operation.

Conclusion

The Workflow DevKit’s serialization system provides:
  • Rich type support: Handles complex types like streams, requests, errors, and custom classes
  • Cross-boundary data transfer: Seamless serialization across client, workflow, and step contexts
  • Format evolution: Prefix-based format system allows gradual migration
  • Performance: Binary encoding and batched stream writes reduce overhead
  • Debugging: Helpful error messages with path information
This serialization layer is fundamental to the Workflow DevKit’s ability to provide durable, resumable workflows with a natural JavaScript developer experience.

Build docs developers (and LLMs) love