Overview
GLYPH provides streaming capabilities for processing LLM output as it arrives, enabling:
- Early tool detection: Know the tool name before response completes
- Early rejection: Stop on unknown tools mid-stream
- Incremental validation: Check constraints as tokens arrive
- Event-based parsing: Process values as they’re parsed
- Latency savings: Reject bad payloads without waiting for completion
StreamValidator
Validate GLYPH tool calls incrementally as tokens arrive.
Creating a Validator
NewStreamingValidator
func(registry *ToolRegistry) *StreamingValidator
Creates a new streaming validator with a tool registry.registry := glyph.NewToolRegistry()
registry.Register(&glyph.ToolSchema{
Name: "search",
Description: "Search for information",
Args: map[string]glyph.ArgSchema{
"query": {
Type: "string",
Required: true,
MinLen: glyph.MinInt(1),
},
"max_results": {
Type: "int",
Required: false,
Min: glyph.MinFloat64(1),
Max: glyph.MaxFloat64(100),
},
},
})
validator := glyph.NewStreamingValidator(registry)
Creates a new tool registry.registry := glyph.NewToolRegistry()
Register
func(r *ToolRegistry, tool *ToolSchema)
Registers a tool schema.registry.Register(&glyph.ToolSchema{
Name: "calculate",
Description: "Evaluate mathematical expressions",
Args: map[string]glyph.ArgSchema{
"expression": {
Type: "string",
Required: true,
},
"precision": {
Type: "int",
Required: false,
Min: glyph.MinFloat64(0),
Max: glyph.MaxFloat64(15),
},
},
})
type ToolSchema struct {
Name string
Description string
Args map[string]ArgSchema
}
type ArgSchema struct {
Type string // "string", "int", "float", "bool"
Required bool // Whether argument is required
Min *float64 // Minimum value (for numbers)
Max *float64 // Maximum value (for numbers)
MinLen *int // Minimum string length
MaxLen *int // Maximum string length
Pattern *regexp.Regexp // Regex pattern (for strings)
Enum []string // Allowed values (for strings)
}
Streaming Validation
Processing Tokens
Start
func(v *StreamingValidator)
Begins timing for the validation session.
PushToken
func(v *StreamingValidator, token string) *StreamValidationResult
Processes a token from the LLM stream.// Simulate streaming tokens from LLM
tokens := []string{
`{`,
`action=`,
`"search"`,
` `,
`query=`,
`"AI news"`,
` `,
`max_results=`,
`10`,
`}`,
}
var result *glyph.StreamValidationResult
for _, token := range tokens {
result = validator.PushToken(token)
// Check for early stop condition
if validator.ShouldStop() {
// Stop LLM stream - unknown tool detected
break
}
}
if result.Complete && result.Valid {
fmt.Printf("Tool: %s\n", result.ToolName)
fmt.Printf("Detected at token %d\n",
result.ToolDetectedAtToken)
}
ShouldStop
func(v *StreamingValidator) bool
Returns true if validation detected a fatal error warranting early stop.if validator.ShouldStop() {
// Cancel LLM stream
cancelStream()
}
Reset
func(v *StreamingValidator)
Clears validator state for reuse.
Validation Result
type StreamValidationResult struct {
Complete bool
Valid bool
ToolName string
ToolAllowed *bool
Errors []StreamValidationError
Fields map[string]interface{}
TokenCount int
CharCount int
Timeline []TimelineEvent
// Timing metrics
ToolDetectedAtToken int
ToolDetectedAtTime time.Duration
FirstErrorAtToken int
FirstErrorAtTime time.Duration
CompleteAtToken int
CompleteAtTime time.Duration
}
True when object is fully parsed.
True when no errors encountered.
Detected tool name (may be empty if not yet parsed).
Whether tool is in allow list (nil if unknown).
Accumulated validation errors.for _, err := range result.Errors {
fmt.Printf("%s: %s\n", err.Code, err.Message)
}
Significant events during validation.for _, event := range result.Timeline {
fmt.Printf("%s at token %d: %s\n",
event.Event, event.Token, event.Detail)
}
Incremental Parser
Event-based parser for streaming GLYPH input.
Creating a Parser
NewIncrementalParser
func(handler ParseHandler, opts IncrementalParserOptions) *IncrementalParser
Creates a new incremental parser.handler := func(event glyph.ParseEvent) error {
switch event.Type {
case glyph.EventStartObject:
fmt.Printf("Start object: %s\n", event.TypeName)
case glyph.EventKey:
fmt.Printf("Key: %s\n", event.Key)
case glyph.EventValue:
fmt.Printf("Value: %v\n", event.Value)
case glyph.EventEndObject:
fmt.Println("End object")
}
return nil
}
parser := glyph.NewIncrementalParser(
handler,
glyph.DefaultIncrementalParserOptions(),
)
Parser Options
type IncrementalParserOptions struct {
MaxDepth int // Maximum nesting depth (default: 128)
MaxKeyLen int // Maximum key length (default: 4096)
MaxValueLen int // Maximum value length (default: 1MB)
}
Feeding Data
Feed
func(p *IncrementalParser, data []byte) (int, error)
Adds more input data to the parser.input := []byte(`Team{id=^t:ARS name="Arsenal"}`)
// Feed data in chunks (simulating streaming)
chunkSize := 10
for i := 0; i < len(input); i += chunkSize {
end := i + chunkSize
if end > len(input) {
end = len(input)
}
consumed, err := parser.Feed(input[i:end])
if err != nil {
return err
}
fmt.Printf("Consumed %d bytes\n", consumed)
}
End
func(p *IncrementalParser) error
Signals end of input.if err := parser.End(); err != nil {
return err
}
Reset
func(p *IncrementalParser)
Clears parser state for reuse.
Parse Events
type ParseEventType uint8
const (
EventStartObject
EventEndObject
EventStartList
EventEndList
EventKey
EventValue
EventStartSum
EventEndSum
EventError
EventNeedMore
)
type ParseEvent struct {
Type ParseEventType
Path []PathElement
Key string
Value *GValue
TypeName string
Tag string
Error error
Pos Position
}
Streaming Dictionary
Persist key dictionaries across multiple frames for compression.
Creating a Session
NewStreamSession
func(opts SessionOptions) *StreamSession
Creates a streaming session with dictionary learning.session := glyph.NewStreamSession(glyph.SessionOptions{
SessionID: 12345,
LearnFrames: 10, // Learn keys from first 10 frames
DictOptions: glyph.DefaultStreamDictOptions(),
})
Session Operations
LearnKeys
func(s *StreamSession, v *GValue)
Extracts and learns keys from a value during learning phase.value := glyph.Parse(input).Value
session.LearnKeys(value)
EncodeKey
func(s *StreamSession, key string) (uint16, bool)
Encodes a key using the session dictionary.idx, ok := session.EncodeKey("name")
if ok {
fmt.Printf("Key 'name' encoded as %d\n", idx)
}
DecodeKey
func(s *StreamSession, idx uint16) string
Decodes a key index.key := session.DecodeKey(idx)
fmt.Printf("Index %d = %s\n", idx, key)
func validateToolCall(stream <-chan string) error {
registry := glyph.DefaultToolRegistry()
validator := glyph.NewStreamingValidator(registry)
validator.Start()
for token := range stream {
result := validator.PushToken(token)
// Tool detected - check if allowed
if result.ToolName != "" && result.ToolAllowed != nil {
if !*result.ToolAllowed {
return fmt.Errorf(
"unknown tool %q detected at token %d",
result.ToolName,
result.ToolDetectedAtToken,
)
}
fmt.Printf("Tool %q allowed\n", result.ToolName)
}
// Check for constraint violations
if len(result.Errors) > 0 {
for _, err := range result.Errors {
log.Printf("Validation error: %s\n", err.Message)
}
}
// Early stop on fatal errors
if validator.ShouldStop() {
return fmt.Errorf("validation failed")
}
}
result := validator.GetResult()
if !result.Valid {
return fmt.Errorf("invalid tool call")
}
return nil
}
Example: Incremental Parsing
func streamParse(reader io.Reader) error {
var objects []*glyph.GValue
var currentPath []string
handler := func(event glyph.ParseEvent) error {
switch event.Type {
case glyph.EventStartObject:
fmt.Printf("Start: %s\n", event.TypeName)
case glyph.EventKey:
fmt.Printf("Key: %s\n", event.Key)
case glyph.EventValue:
// Process value immediately
if event.Value != nil {
processValue(event.Path, event.Value)
}
case glyph.EventError:
return event.Error
}
return nil
}
parser := glyph.NewIncrementalParser(
handler,
glyph.DefaultIncrementalParserOptions(),
)
// Read and feed data in chunks
buf := make([]byte, 4096)
for {
n, err := reader.Read(buf)
if err == io.EOF {
break
}
if err != nil {
return err
}
_, err = parser.Feed(buf[:n])
if err != nil {
return err
}
}
return parser.End()
}
Example: Streaming Dictionary
func streamingSession() {
session := glyph.NewStreamSession(glyph.SessionOptions{
SessionID: 12345,
LearnFrames: 5,
})
// First 5 frames: learning mode
for i := 0; i < 5; i++ {
value := generateFrame(i)
session.LearnKeys(value)
frame := glyph.EncodeDictFrame(value, session)
send(frame)
}
// After learning: dictionary frozen
fmt.Printf("Dictionary has %d keys\n", session.Dict().Len())
// Subsequent frames use learned dictionary
for i := 5; i < 100; i++ {
value := generateFrame(i)
frame := glyph.EncodeDictFrame(value, session)
send(frame)
}
// Save dictionary for session resumption
dictBytes := session.SaveDict()
saveToStorage(dictBytes)
}