Skip to main content

Overview

GLYPH provides streaming capabilities for processing LLM output as it arrives, enabling:
  • Early tool detection: Know the tool name before response completes
  • Early rejection: Stop on unknown tools mid-stream
  • Incremental validation: Check constraints as tokens arrive
  • Event-based parsing: Process values as they’re parsed
  • Latency savings: Reject bad payloads without waiting for completion

StreamValidator

Validate GLYPH tool calls incrementally as tokens arrive.

Creating a Validator

NewStreamingValidator
func(registry *ToolRegistry) *StreamingValidator
Creates a new streaming validator with a tool registry.
registry := glyph.NewToolRegistry()
registry.Register(&glyph.ToolSchema{
    Name: "search",
    Description: "Search for information",
    Args: map[string]glyph.ArgSchema{
        "query": {
            Type:     "string",
            Required: true,
            MinLen:   glyph.MinInt(1),
        },
        "max_results": {
            Type:     "int",
            Required: false,
            Min:      glyph.MinFloat64(1),
            Max:      glyph.MaxFloat64(100),
        },
    },
})

validator := glyph.NewStreamingValidator(registry)

Tool Registry

NewToolRegistry
func() *ToolRegistry
Creates a new tool registry.
registry := glyph.NewToolRegistry()
Register
func(r *ToolRegistry, tool *ToolSchema)
Registers a tool schema.
registry.Register(&glyph.ToolSchema{
    Name:        "calculate",
    Description: "Evaluate mathematical expressions",
    Args: map[string]glyph.ArgSchema{
        "expression": {
            Type:     "string",
            Required: true,
        },
        "precision": {
            Type:     "int",
            Required: false,
            Min:      glyph.MinFloat64(0),
            Max:      glyph.MaxFloat64(15),
        },
    },
})

Tool Schema Definition

type ToolSchema struct {
    Name        string
    Description string
    Args        map[string]ArgSchema
}

type ArgSchema struct {
    Type     string         // "string", "int", "float", "bool"
    Required bool           // Whether argument is required
    Min      *float64       // Minimum value (for numbers)
    Max      *float64       // Maximum value (for numbers)
    MinLen   *int           // Minimum string length
    MaxLen   *int           // Maximum string length
    Pattern  *regexp.Regexp // Regex pattern (for strings)
    Enum     []string       // Allowed values (for strings)
}

Streaming Validation

Processing Tokens

Start
func(v *StreamingValidator)
Begins timing for the validation session.
validator.Start()
PushToken
func(v *StreamingValidator, token string) *StreamValidationResult
Processes a token from the LLM stream.
// Simulate streaming tokens from LLM
tokens := []string{
    `{`,
    `action=`,
    `"search"`,
    ` `,
    `query=`,
    `"AI news"`,
    ` `,
    `max_results=`,
    `10`,
    `}`,
}

var result *glyph.StreamValidationResult
for _, token := range tokens {
    result = validator.PushToken(token)
    
    // Check for early stop condition
    if validator.ShouldStop() {
        // Stop LLM stream - unknown tool detected
        break
    }
}

if result.Complete && result.Valid {
    fmt.Printf("Tool: %s\n", result.ToolName)
    fmt.Printf("Detected at token %d\n",
        result.ToolDetectedAtToken)
}
ShouldStop
func(v *StreamingValidator) bool
Returns true if validation detected a fatal error warranting early stop.
if validator.ShouldStop() {
    // Cancel LLM stream
    cancelStream()
}
Reset
func(v *StreamingValidator)
Clears validator state for reuse.
validator.Reset()

Validation Result

type StreamValidationResult struct {
    Complete    bool
    Valid       bool
    ToolName    string
    ToolAllowed *bool
    Errors      []StreamValidationError
    Fields      map[string]interface{}
    TokenCount  int
    CharCount   int
    Timeline    []TimelineEvent
    
    // Timing metrics
    ToolDetectedAtToken int
    ToolDetectedAtTime  time.Duration
    FirstErrorAtToken   int
    FirstErrorAtTime    time.Duration
    CompleteAtToken     int
    CompleteAtTime      time.Duration
}
Complete
bool
True when object is fully parsed.
Valid
bool
True when no errors encountered.
ToolName
string
Detected tool name (may be empty if not yet parsed).
ToolAllowed
*bool
Whether tool is in allow list (nil if unknown).
Errors
[]StreamValidationError
Accumulated validation errors.
for _, err := range result.Errors {
    fmt.Printf("%s: %s\n", err.Code, err.Message)
}
Timeline
[]TimelineEvent
Significant events during validation.
for _, event := range result.Timeline {
    fmt.Printf("%s at token %d: %s\n",
        event.Event, event.Token, event.Detail)
}

Incremental Parser

Event-based parser for streaming GLYPH input.

Creating a Parser

NewIncrementalParser
func(handler ParseHandler, opts IncrementalParserOptions) *IncrementalParser
Creates a new incremental parser.
handler := func(event glyph.ParseEvent) error {
    switch event.Type {
    case glyph.EventStartObject:
        fmt.Printf("Start object: %s\n", event.TypeName)
    case glyph.EventKey:
        fmt.Printf("Key: %s\n", event.Key)
    case glyph.EventValue:
        fmt.Printf("Value: %v\n", event.Value)
    case glyph.EventEndObject:
        fmt.Println("End object")
    }
    return nil
}

parser := glyph.NewIncrementalParser(
    handler,
    glyph.DefaultIncrementalParserOptions(),
)

Parser Options

type IncrementalParserOptions struct {
    MaxDepth    int // Maximum nesting depth (default: 128)
    MaxKeyLen   int // Maximum key length (default: 4096)
    MaxValueLen int // Maximum value length (default: 1MB)
}

Feeding Data

Feed
func(p *IncrementalParser, data []byte) (int, error)
Adds more input data to the parser.
input := []byte(`Team{id=^t:ARS name="Arsenal"}`)

// Feed data in chunks (simulating streaming)
chunkSize := 10
for i := 0; i < len(input); i += chunkSize {
    end := i + chunkSize
    if end > len(input) {
        end = len(input)
    }
    
    consumed, err := parser.Feed(input[i:end])
    if err != nil {
        return err
    }
    fmt.Printf("Consumed %d bytes\n", consumed)
}
End
func(p *IncrementalParser) error
Signals end of input.
if err := parser.End(); err != nil {
    return err
}
Reset
func(p *IncrementalParser)
Clears parser state for reuse.
parser.Reset()

Parse Events

type ParseEventType uint8

const (
    EventStartObject
    EventEndObject
    EventStartList
    EventEndList
    EventKey
    EventValue
    EventStartSum
    EventEndSum
    EventError
    EventNeedMore
)

type ParseEvent struct {
    Type     ParseEventType
    Path     []PathElement
    Key      string
    Value    *GValue
    TypeName string
    Tag      string
    Error    error
    Pos      Position
}

Streaming Dictionary

Persist key dictionaries across multiple frames for compression.

Creating a Session

NewStreamSession
func(opts SessionOptions) *StreamSession
Creates a streaming session with dictionary learning.
session := glyph.NewStreamSession(glyph.SessionOptions{
    SessionID:   12345,
    LearnFrames: 10, // Learn keys from first 10 frames
    DictOptions: glyph.DefaultStreamDictOptions(),
})

Session Operations

LearnKeys
func(s *StreamSession, v *GValue)
Extracts and learns keys from a value during learning phase.
value := glyph.Parse(input).Value
session.LearnKeys(value)
EncodeKey
func(s *StreamSession, key string) (uint16, bool)
Encodes a key using the session dictionary.
idx, ok := session.EncodeKey("name")
if ok {
    fmt.Printf("Key 'name' encoded as %d\n", idx)
}
DecodeKey
func(s *StreamSession, idx uint16) string
Decodes a key index.
key := session.DecodeKey(idx)
fmt.Printf("Index %d = %s\n", idx, key)

Example: Early Tool Detection

func validateToolCall(stream <-chan string) error {
    registry := glyph.DefaultToolRegistry()
    validator := glyph.NewStreamingValidator(registry)
    validator.Start()
    
    for token := range stream {
        result := validator.PushToken(token)
        
        // Tool detected - check if allowed
        if result.ToolName != "" && result.ToolAllowed != nil {
            if !*result.ToolAllowed {
                return fmt.Errorf(
                    "unknown tool %q detected at token %d",
                    result.ToolName,
                    result.ToolDetectedAtToken,
                )
            }
            fmt.Printf("Tool %q allowed\n", result.ToolName)
        }
        
        // Check for constraint violations
        if len(result.Errors) > 0 {
            for _, err := range result.Errors {
                log.Printf("Validation error: %s\n", err.Message)
            }
        }
        
        // Early stop on fatal errors
        if validator.ShouldStop() {
            return fmt.Errorf("validation failed")
        }
    }
    
    result := validator.GetResult()
    if !result.Valid {
        return fmt.Errorf("invalid tool call")
    }
    
    return nil
}

Example: Incremental Parsing

func streamParse(reader io.Reader) error {
    var objects []*glyph.GValue
    var currentPath []string
    
    handler := func(event glyph.ParseEvent) error {
        switch event.Type {
        case glyph.EventStartObject:
            fmt.Printf("Start: %s\n", event.TypeName)
            
        case glyph.EventKey:
            fmt.Printf("Key: %s\n", event.Key)
            
        case glyph.EventValue:
            // Process value immediately
            if event.Value != nil {
                processValue(event.Path, event.Value)
            }
            
        case glyph.EventError:
            return event.Error
        }
        return nil
    }
    
    parser := glyph.NewIncrementalParser(
        handler,
        glyph.DefaultIncrementalParserOptions(),
    )
    
    // Read and feed data in chunks
    buf := make([]byte, 4096)
    for {
        n, err := reader.Read(buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        
        _, err = parser.Feed(buf[:n])
        if err != nil {
            return err
        }
    }
    
    return parser.End()
}

Example: Streaming Dictionary

func streamingSession() {
    session := glyph.NewStreamSession(glyph.SessionOptions{
        SessionID:   12345,
        LearnFrames: 5,
    })
    
    // First 5 frames: learning mode
    for i := 0; i < 5; i++ {
        value := generateFrame(i)
        session.LearnKeys(value)
        
        frame := glyph.EncodeDictFrame(value, session)
        send(frame)
    }
    
    // After learning: dictionary frozen
    fmt.Printf("Dictionary has %d keys\n", session.Dict().Len())
    
    // Subsequent frames use learned dictionary
    for i := 5; i < 100; i++ {
        value := generateFrame(i)
        frame := glyph.EncodeDictFrame(value, session)
        send(frame)
    }
    
    // Save dictionary for session resumption
    dictBytes := session.SaveDict()
    saveToStorage(dictBytes)
}

Build docs developers (and LLMs) love