Skip to main content

Streaming

The GLYPH JavaScript SDK provides streaming validation for real-time LLM tool calls and the GS1 transport protocol for framed message delivery.

StreamingValidator

Validate GLYPH tool calls incrementally as tokens arrive from an LLM, enabling early detection and rejection of invalid requests.

Why Streaming Validation?

Benefits:
  • Early tool detection: Know the tool name before full response
  • Early rejection: Stop on unknown tools mid-stream
  • Incremental validation: Check constraints as tokens arrive
  • Latency savings: Reject bad payloads without waiting for completion
  • Cost savings: Stop token generation on invalid requests

Quick Start

import { 
  StreamingValidator, 
  ToolRegistry, 
  ValidationResult 
} from 'glyph-js';

// 1. Create tool registry
const registry = new ToolRegistry();
registry.register({
  name: 'search',
  description: 'Search for information',
  args: {
    query: { type: 'string', required: true, minLen: 1 },
    max_results: { type: 'int', min: 1, max: 100 }
  }
});

// 2. Create validator
const validator = new StreamingValidator(registry);
validator.start();

// 3. Process tokens as they arrive
const tokens = ['search', '{', 'query', '=', 'AI', ' ', 'max_results', '=', '10', '}'];

for (const token of tokens) {
  const result = validator.pushToken(token);
  
  // Check if tool detected
  if (result.toolName) {
    console.log(`Tool detected: ${result.toolName}`);
    
    // Check if allowed
    if (!result.toolAllowed) {
      console.error('Unknown tool - stopping stream');
      break;
    }
  }
  
  // Check for errors
  if (result.errors.length > 0) {
    console.error('Validation errors:', result.errors);
  }
  
  // Check if complete
  if (result.complete) {
    console.log('Validation complete');
    if (result.valid) {
      console.log('Fields:', result.fields);
    }
    break;
  }
}

ToolRegistry

Define allowed tools and their argument schemas:
import { ToolRegistry, ToolSchema, ArgSchema } from 'glyph-js';

const registry = new ToolRegistry();

// Register a tool
registry.register({
  name: 'calculate',
  description: 'Evaluate a mathematical expression',
  args: {
    expression: { 
      type: 'string', 
      required: true,
      minLen: 1,
      maxLen: 1000
    },
    precision: { 
      type: 'int', 
      min: 0, 
      max: 15 
    }
  }
});

// Check if tool is allowed
if (registry.isAllowed('calculate')) {
  console.log('Tool is registered');
}

// Get tool schema
const schema = registry.get('calculate');
console.log(schema?.description);

ArgSchema

Define argument constraints:
type
string
required
Argument type: 'string', 'int', 'float', 'bool', 'null'
required
boolean
default:"false"
Whether argument is required
min
number
Minimum value (for numbers)
max
number
Maximum value (for numbers)
minLen
number
Minimum length (for strings)
maxLen
number
Maximum length (for strings)
pattern
RegExp
Regex pattern (for strings)
enumValues
string[]
Allowed values (for strings)

Validation Result

The validator returns a detailed result on each token:
import { ValidationResult } from 'glyph-js';

const result: ValidationResult = validator.pushToken(token);
complete
boolean
Whether parsing is complete
valid
boolean
Whether validation passed (no errors)
state
ValidatorState
Current parser state: 'waiting', 'in_object', 'complete', 'error'
toolName
string | null
Detected tool name (null if not yet detected)
toolAllowed
boolean | null
Whether tool is in registry (null if not yet detected)
errors
ValidationError[]
List of validation errors
fields
Record<string, FieldValue>
Parsed argument fields
tokenCount
number
Number of tokens processed
charCount
number
Number of characters processed
timeline
TimelineEvent[]
Timeline of significant events
toolDetectedAtToken
number
Token number when tool was detected
toolDetectedAtTime
number
Milliseconds when tool was detected
firstErrorAtToken
number
Token number of first error
completeAtToken
number
Token number when parsing completed

Validator Limits

Configure limits to prevent DoS attacks:
import { 
  StreamingValidator, 
  ValidatorLimits,
  DEFAULT_MAX_BUFFER,
  DEFAULT_MAX_FIELDS,
  DEFAULT_MAX_ERRORS 
} from 'glyph-js';

const limits: ValidatorLimits = {
  maxBufferSize: 1024 * 1024,  // 1MB (default)
  maxFieldCount: 1000,          // 1000 fields (default)
  maxErrorCount: 100            // 100 errors (default)
};

const validator = new StreamingValidator(registry, limits);

// Or set limits after construction
validator.withLimits({
  maxBufferSize: 512 * 1024,  // 512KB
  maxFieldCount: 500
});

Error Codes

Validation errors include specific codes:
import { ErrorCode, ValidationError } from 'glyph-js';

for (const error of result.errors) {
  switch (error.code) {
    case ErrorCode.UnknownTool:
      console.error('Unknown tool:', error.message);
      break;
    case ErrorCode.MissingRequired:
      console.error('Missing field:', error.field);
      break;
    case ErrorCode.ConstraintMin:
      console.error('Value too small:', error.field);
      break;
    case ErrorCode.ConstraintMax:
      console.error('Value too large:', error.field);
      break;
    case ErrorCode.ConstraintLen:
      console.error('Length constraint:', error.field);
      break;
    case ErrorCode.ConstraintPattern:
      console.error('Pattern mismatch:', error.field);
      break;
    case ErrorCode.ConstraintEnum:
      console.error('Invalid enum value:', error.field);
      break;
    case ErrorCode.LimitExceeded:
      console.error('Limit exceeded:', error.message);
      break;
  }
}

Early Stopping

Detect when to stop streaming:
// Check if stream should stop
if (validator.shouldStop()) {
  console.log('Stopping stream due to fatal error');
  // Stop LLM token generation
}

// Check specific conditions
if (result.toolName && !result.toolAllowed) {
  console.log('Unknown tool detected - rejecting');
}

if (result.errors.some(e => e.code === ErrorCode.LimitExceeded)) {
  console.log('Limit exceeded - rejecting');
}

Validator Reuse

Reset validator for multiple requests:
const validator = new StreamingValidator(registry);

// Validate first request
validator.start();
for (const token of request1Tokens) {
  validator.pushToken(token);
}

// Reset for second request
validator.reset();
validator.start();
for (const token of request2Tokens) {
  validator.pushToken(token);
}

Default Registry

Use the built-in default registry:
import { defaultToolRegistry } from 'glyph-js';

const registry = defaultToolRegistry();
// Includes: search, calculate, browse, execute, read_file, write_file

const validator = new StreamingValidator(registry);

GS1 Stream Protocol

GS1 (GLYPH Stream v1) is a framing protocol for reliable GLYPH message delivery over streams.

Frame Types

import { FrameKind } from 'cowrie-glyph/stream';

// Available frame types
type FrameKind = 
  | 'doc'      // Full document
  | 'patch'    // Document patch
  | 'row'      // Tabular row
  | 'ui'       // UI event
  | 'ack'      // Acknowledgment
  | 'err'      // Error
  | 'ping'     // Keepalive ping
  | 'pong';    // Keepalive pong

Frame Construction

Create frames using helper functions:
import { 
  docFrame, 
  patchFrame, 
  rowFrame, 
  uiFrame,
  ackFrame,
  errFrame,
  pingFrame,
  pongFrame
} from 'cowrie-glyph/stream';

// Document frame
const doc = docFrame({
  sid: 1,
  target: { prefix: 'doc', value: 'main' },
  payload: 'Team@(^t:ARS Arsenal EPL)'
});

// Patch frame
const patch = patchFrame({
  sid: 2,
  target: { prefix: 'doc', value: 'main' },
  payload: '@patch += /teams/-1 Team@(^t:LIV Liverpool EPL)'
});

// Row frame (tabular streaming)
const row = rowFrame({
  sid: 3,
  target: { prefix: 'table', value: 'players' },
  payload: '^p:1 "Saka" Arsenal 14'
});

// UI event frame
const ui = uiFrame({
  sid: 4,
  payload: 'progress{percent=50 message="Processing..."}'
});

// Acknowledgment
const ack = ackFrame({ sid: 5, ackSid: 1 });

// Error
const err = errFrame({ 
  sid: 6, 
  payload: 'error{code=404 message="Not found"}' 
});

Frame Encoding

Encode frames for transmission:
import { encodeFrame, encodeFrames } from 'cowrie-glyph/stream';

// Encode single frame
const encoded = encodeFrame(doc);
console.log(encoded);
// [doc#1 @doc:main crc:a3f5b2c1]\n
// Encode multiple frames
const frames = [doc, patch, row];
const batch = encodeFrames(frames);
console.log(batch);

Frame Decoding

Decode frames from stream:
import { decodeFrame, decodeFrames } from 'cowrie-glyph/stream';

// Decode single frame
const frame = decodeFrame('[doc#1 @doc:main crc:a3f5b2c1]\nPayload data\n');
console.log(frame.kind);    // 'doc'
console.log(frame.sid);     // 1
console.log(frame.payload); // 'Payload data'

// Decode multiple frames
const batch = decodeFrames(streamData);
for (const frame of batch) {
  console.log(`Frame #${frame.sid}: ${frame.kind}`);
}

Stream Reader

Process frames incrementally:
import { Reader, ReaderOptions } from 'cowrie-glyph/stream';

const options: ReaderOptions = {
  verifyCRC: true,    // Verify frame checksums
  verifyBase: false   // Verify state hashes
};

const reader = new Reader(options);

// Feed data incrementally
reader.feed('[
for (const chunk of streamChunks) {
  const frames = reader.feed(chunk);
  
  for (const frame of frames) {
    switch (frame.kind) {
      case 'doc':
        console.log('Received document:', frame.payload);
        break;
      case 'patch':
        console.log('Received patch:', frame.payload);
        break;
      case 'ui':
        console.log('UI event:', frame.payload);
        break;
      case 'err':
        console.error('Error:', frame.payload);
        break;
    }
  }
}

Stream Cursor

Track stream state and handle frames:
import { StreamCursor, FrameHandler } from 'cowrie-glyph/stream';

const handler: FrameHandler = {
  onDoc: (frame) => {
    console.log('Document received:', frame.payload);
  },
  onPatch: (frame) => {
    console.log('Patch received:', frame.payload);
  },
  onRow: (frame) => {
    console.log('Row received:', frame.payload);
  },
  onUI: (frame) => {
    console.log('UI event:', frame.payload);
  },
  onError: (frame) => {
    console.error('Error:', frame.payload);
  }
};

const cursor = new StreamCursor(handler);

// Process frames
for (const frame of frames) {
  cursor.handle(frame);
}

UI Events

Send UI events during processing:
import { 
  progress, 
  log, 
  logInfo,
  logWarn,
  logError,
  metric,
  counter,
  artifact,
  emitUI
} from 'cowrie-glyph/stream';

// Progress indicator
const prog = progress(50, 'Processing data...');
const frame1 = uiFrame({ sid: 1, payload: emitUI(prog) });

// Log messages
const info = logInfo('Operation started');
const warn = logWarn('High memory usage');
const error = logError('Connection failed');

// Metrics
const met = metric('latency', 125.5, 'ms');
const cnt = counter('requests', 1000);

// Artifacts
const art = artifact('report', 'application/pdf', 'https://example.com/report.pdf');

CRC Verification

Frames include CRC checksums for integrity:
import { computeCRC, verifyCRC } from 'cowrie-glyph/stream';

// Compute CRC for payload
const crc = computeCRC(payload);
console.log(crc.toString(16)); // "a3f5b2c1"

// Verify frame CRC
const valid = verifyCRC(frame);
if (!valid) {
  console.error('CRC mismatch - corrupted frame');
}

Complete Example

Streaming validation with GS1 transport:
import { 
  StreamingValidator, 
  ToolRegistry,
  defaultToolRegistry 
} from 'glyph-js';
import { 
  Reader,
  uiFrame,
  errFrame,
  emitProgress,
  encodeFrame
} from 'cowrie-glyph/stream';

// Setup
const registry = defaultToolRegistry();
const validator = new StreamingValidator(registry);
const reader = new Reader({ verifyCRC: true });

let sid = 1;

// Process LLM stream
for await (const chunk of llmStream) {
  // Validate incrementally
  const result = validator.pushToken(chunk);
  
  // Send progress
  if (result.tokenCount % 10 === 0) {
    const progress = uiFrame({
      sid: sid++,
      payload: emitProgress(result.tokenCount, 'Validating...')
    });
    await send(encodeFrame(progress));
  }
  
  // Early rejection
  if (validator.shouldStop()) {
    const error = errFrame({
      sid: sid++,
      payload: `error{code=INVALID_TOOL message="${result.errors[0].message}"}`
    });
    await send(encodeFrame(error));
    break;
  }
  
  // Complete
  if (result.complete && result.valid) {
    console.log('Valid tool call:', result.fields);
    // Execute tool...
  }
}

Next Steps

Core Types

Learn about GValue and Schema

Parsing

Parse GLYPH format strings

Build docs developers (and LLMs) love