Streaming
The GLYPH JavaScript SDK provides streaming validation for real-time LLM tool calls and the GS1 transport protocol for framed message delivery.
StreamingValidator
Validate GLYPH tool calls incrementally as tokens arrive from an LLM, enabling early detection and rejection of invalid requests.
Why Streaming Validation?
Benefits:
Early tool detection : Know the tool name before full response
Early rejection : Stop on unknown tools mid-stream
Incremental validation : Check constraints as tokens arrive
Latency savings : Reject bad payloads without waiting for completion
Cost savings : Stop token generation on invalid requests
Quick Start
import {
StreamingValidator ,
ToolRegistry ,
ValidationResult
} from 'glyph-js' ;
// 1. Create tool registry
const registry = new ToolRegistry ();
registry . register ({
name: 'search' ,
description: 'Search for information' ,
args: {
query: { type: 'string' , required: true , minLen: 1 },
max_results: { type: 'int' , min: 1 , max: 100 }
}
});
// 2. Create validator
const validator = new StreamingValidator ( registry );
validator . start ();
// 3. Process tokens as they arrive
const tokens = [ 'search' , '{' , 'query' , '=' , 'AI' , ' ' , 'max_results' , '=' , '10' , '}' ];
for ( const token of tokens ) {
const result = validator . pushToken ( token );
// Check if tool detected
if ( result . toolName ) {
console . log ( `Tool detected: ${ result . toolName } ` );
// Check if allowed
if ( ! result . toolAllowed ) {
console . error ( 'Unknown tool - stopping stream' );
break ;
}
}
// Check for errors
if ( result . errors . length > 0 ) {
console . error ( 'Validation errors:' , result . errors );
}
// Check if complete
if ( result . complete ) {
console . log ( 'Validation complete' );
if ( result . valid ) {
console . log ( 'Fields:' , result . fields );
}
break ;
}
}
Define allowed tools and their argument schemas:
import { ToolRegistry , ToolSchema , ArgSchema } from 'glyph-js' ;
const registry = new ToolRegistry ();
// Register a tool
registry . register ({
name: 'calculate' ,
description: 'Evaluate a mathematical expression' ,
args: {
expression: {
type: 'string' ,
required: true ,
minLen: 1 ,
maxLen: 1000
},
precision: {
type: 'int' ,
min: 0 ,
max: 15
}
}
});
// Check if tool is allowed
if ( registry . isAllowed ( 'calculate' )) {
console . log ( 'Tool is registered' );
}
// Get tool schema
const schema = registry . get ( 'calculate' );
console . log ( schema ?. description );
ArgSchema
Define argument constraints:
Argument type: 'string', 'int', 'float', 'bool', 'null'
Whether argument is required
Minimum value (for numbers)
Maximum value (for numbers)
Minimum length (for strings)
Maximum length (for strings)
Regex pattern (for strings)
Allowed values (for strings)
Validation Result
The validator returns a detailed result on each token:
import { ValidationResult } from 'glyph-js' ;
const result : ValidationResult = validator . pushToken ( token );
Whether parsing is complete
Whether validation passed (no errors)
Current parser state: 'waiting', 'in_object', 'complete', 'error'
Detected tool name (null if not yet detected)
Whether tool is in registry (null if not yet detected)
List of validation errors
fields
Record<string, FieldValue>
Parsed argument fields
Number of tokens processed
Number of characters processed
Timeline of significant events
Token number when tool was detected
Milliseconds when tool was detected
Token number of first error
Token number when parsing completed
Validator Limits
Configure limits to prevent DoS attacks:
import {
StreamingValidator ,
ValidatorLimits ,
DEFAULT_MAX_BUFFER ,
DEFAULT_MAX_FIELDS ,
DEFAULT_MAX_ERRORS
} from 'glyph-js' ;
const limits : ValidatorLimits = {
maxBufferSize: 1024 * 1024 , // 1MB (default)
maxFieldCount: 1000 , // 1000 fields (default)
maxErrorCount: 100 // 100 errors (default)
};
const validator = new StreamingValidator ( registry , limits );
// Or set limits after construction
validator . withLimits ({
maxBufferSize: 512 * 1024 , // 512KB
maxFieldCount: 500
});
Error Codes
Validation errors include specific codes:
import { ErrorCode , ValidationError } from 'glyph-js' ;
for ( const error of result . errors ) {
switch ( error . code ) {
case ErrorCode . UnknownTool :
console . error ( 'Unknown tool:' , error . message );
break ;
case ErrorCode . MissingRequired :
console . error ( 'Missing field:' , error . field );
break ;
case ErrorCode . ConstraintMin :
console . error ( 'Value too small:' , error . field );
break ;
case ErrorCode . ConstraintMax :
console . error ( 'Value too large:' , error . field );
break ;
case ErrorCode . ConstraintLen :
console . error ( 'Length constraint:' , error . field );
break ;
case ErrorCode . ConstraintPattern :
console . error ( 'Pattern mismatch:' , error . field );
break ;
case ErrorCode . ConstraintEnum :
console . error ( 'Invalid enum value:' , error . field );
break ;
case ErrorCode . LimitExceeded :
console . error ( 'Limit exceeded:' , error . message );
break ;
}
}
Early Stopping
Detect when to stop streaming:
// Check if stream should stop
if ( validator . shouldStop ()) {
console . log ( 'Stopping stream due to fatal error' );
// Stop LLM token generation
}
// Check specific conditions
if ( result . toolName && ! result . toolAllowed ) {
console . log ( 'Unknown tool detected - rejecting' );
}
if ( result . errors . some ( e => e . code === ErrorCode . LimitExceeded )) {
console . log ( 'Limit exceeded - rejecting' );
}
Validator Reuse
Reset validator for multiple requests:
const validator = new StreamingValidator ( registry );
// Validate first request
validator . start ();
for ( const token of request1Tokens ) {
validator . pushToken ( token );
}
// Reset for second request
validator . reset ();
validator . start ();
for ( const token of request2Tokens ) {
validator . pushToken ( token );
}
Default Registry
Use the built-in default registry:
import { defaultToolRegistry } from 'glyph-js' ;
const registry = defaultToolRegistry ();
// Includes: search, calculate, browse, execute, read_file, write_file
const validator = new StreamingValidator ( registry );
GS1 Stream Protocol
GS1 (GLYPH Stream v1) is a framing protocol for reliable GLYPH message delivery over streams.
Frame Types
import { FrameKind } from 'cowrie-glyph/stream' ;
// Available frame types
type FrameKind =
| 'doc' // Full document
| 'patch' // Document patch
| 'row' // Tabular row
| 'ui' // UI event
| 'ack' // Acknowledgment
| 'err' // Error
| 'ping' // Keepalive ping
| 'pong' ; // Keepalive pong
Frame Construction
Create frames using helper functions:
import {
docFrame ,
patchFrame ,
rowFrame ,
uiFrame ,
ackFrame ,
errFrame ,
pingFrame ,
pongFrame
} from 'cowrie-glyph/stream' ;
// Document frame
const doc = docFrame ({
sid: 1 ,
target: { prefix: 'doc' , value: 'main' },
payload: 'Team@(^t:ARS Arsenal EPL)'
});
// Patch frame
const patch = patchFrame ({
sid: 2 ,
target: { prefix: 'doc' , value: 'main' },
payload: '@patch += /teams/-1 Team@(^t:LIV Liverpool EPL)'
});
// Row frame (tabular streaming)
const row = rowFrame ({
sid: 3 ,
target: { prefix: 'table' , value: 'players' },
payload: '^p:1 "Saka" Arsenal 14'
});
// UI event frame
const ui = uiFrame ({
sid: 4 ,
payload: 'progress{percent=50 message="Processing..."}'
});
// Acknowledgment
const ack = ackFrame ({ sid: 5 , ackSid: 1 });
// Error
const err = errFrame ({
sid: 6 ,
payload: 'error{code=404 message="Not found"}'
});
Frame Encoding
Encode frames for transmission:
import { encodeFrame , encodeFrames } from 'cowrie-glyph/stream' ;
// Encode single frame
const encoded = encodeFrame ( doc );
console . log ( encoded );
// [doc#1 @doc:main crc:a3f5b2c1]\n
// Encode multiple frames
const frames = [ doc , patch , row ];
const batch = encodeFrames ( frames );
console . log ( batch );
Frame Decoding
Decode frames from stream:
import { decodeFrame , decodeFrames } from 'cowrie-glyph/stream' ;
// Decode single frame
const frame = decodeFrame ( '[doc#1 @doc:main crc:a3f5b2c1] \n Payload data \n ' );
console . log ( frame . kind ); // 'doc'
console . log ( frame . sid ); // 1
console . log ( frame . payload ); // 'Payload data'
// Decode multiple frames
const batch = decodeFrames ( streamData );
for ( const frame of batch ) {
console . log ( `Frame # ${ frame . sid } : ${ frame . kind } ` );
}
Stream Reader
Process frames incrementally:
import { Reader , ReaderOptions } from 'cowrie-glyph/stream' ;
const options : ReaderOptions = {
verifyCRC: true , // Verify frame checksums
verifyBase: false // Verify state hashes
};
const reader = new Reader ( options );
// Feed data incrementally
reader . feed ( ' [
for ( const chunk of streamChunks ) {
const frames = reader . feed ( chunk );
for ( const frame of frames ) {
switch ( frame . kind ) {
case 'doc' :
console . log ( 'Received document:' , frame . payload );
break ;
case 'patch' :
console . log ( 'Received patch:' , frame . payload );
break ;
case 'ui' :
console . log ( 'UI event:' , frame . payload );
break ;
case 'err' :
console . error ( 'Error:' , frame . payload );
break ;
}
}
}
Stream Cursor
Track stream state and handle frames:
import { StreamCursor , FrameHandler } from 'cowrie-glyph/stream' ;
const handler : FrameHandler = {
onDoc : ( frame ) => {
console . log ( 'Document received:' , frame . payload );
},
onPatch : ( frame ) => {
console . log ( 'Patch received:' , frame . payload );
},
onRow : ( frame ) => {
console . log ( 'Row received:' , frame . payload );
},
onUI : ( frame ) => {
console . log ( 'UI event:' , frame . payload );
},
onError : ( frame ) => {
console . error ( 'Error:' , frame . payload );
}
};
const cursor = new StreamCursor ( handler );
// Process frames
for ( const frame of frames ) {
cursor . handle ( frame );
}
UI Events
Send UI events during processing:
import {
progress ,
log ,
logInfo ,
logWarn ,
logError ,
metric ,
counter ,
artifact ,
emitUI
} from 'cowrie-glyph/stream' ;
// Progress indicator
const prog = progress ( 50 , 'Processing data...' );
const frame1 = uiFrame ({ sid: 1 , payload: emitUI ( prog ) });
// Log messages
const info = logInfo ( 'Operation started' );
const warn = logWarn ( 'High memory usage' );
const error = logError ( 'Connection failed' );
// Metrics
const met = metric ( 'latency' , 125.5 , 'ms' );
const cnt = counter ( 'requests' , 1000 );
// Artifacts
const art = artifact ( 'report' , 'application/pdf' , 'https://example.com/report.pdf' );
CRC Verification
Frames include CRC checksums for integrity:
import { computeCRC , verifyCRC } from 'cowrie-glyph/stream' ;
// Compute CRC for payload
const crc = computeCRC ( payload );
console . log ( crc . toString ( 16 )); // "a3f5b2c1"
// Verify frame CRC
const valid = verifyCRC ( frame );
if ( ! valid ) {
console . error ( 'CRC mismatch - corrupted frame' );
}
Complete Example
Streaming validation with GS1 transport:
import {
StreamingValidator ,
ToolRegistry ,
defaultToolRegistry
} from 'glyph-js' ;
import {
Reader ,
uiFrame ,
errFrame ,
emitProgress ,
encodeFrame
} from 'cowrie-glyph/stream' ;
// Setup
const registry = defaultToolRegistry ();
const validator = new StreamingValidator ( registry );
const reader = new Reader ({ verifyCRC: true });
let sid = 1 ;
// Process LLM stream
for await ( const chunk of llmStream ) {
// Validate incrementally
const result = validator . pushToken ( chunk );
// Send progress
if ( result . tokenCount % 10 === 0 ) {
const progress = uiFrame ({
sid: sid ++ ,
payload: emitProgress ( result . tokenCount , 'Validating...' )
});
await send ( encodeFrame ( progress ));
}
// Early rejection
if ( validator . shouldStop ()) {
const error = errFrame ({
sid: sid ++ ,
payload: `error{code=INVALID_TOOL message=" ${ result . errors [ 0 ]. message } "}`
});
await send ( encodeFrame ( error ));
break ;
}
// Complete
if ( result . complete && result . valid ) {
console . log ( 'Valid tool call:' , result . fields );
// Execute tool...
}
}
Next Steps
Core Types Learn about GValue and Schema
Parsing Parse GLYPH format strings