Skip to main content
GLYPH’s streaming validator lets you check tool calls as they arrive, token by token. Detect unknown tools at token 3-5, constraint violations mid-stream, and save 80%+ latency on bad requests.

Why Stream Validation?

Traditional validation happens after the LLM finishes generating:
# Traditional approach
response = await llm.generate(prompt)  # Wait for 50+ tokens
result = parse(response)               # Finally parse
if result.tool not in allowed:
    raise Error()                      # Too late - already paid for tokens
With GLYPH streaming validation:
validator = glyph.StreamingValidator(registry)

async for token in llm.stream(prompt):
    result = validator.push(token)
    
    # Tool detected at token 3-5
    if result.tool_name and not result.tool_allowed:
        await cancel()  # Stop immediately
        break

Early Detection

Tool name detected at token 3-5, not 50+

Save Latency

Cancel bad requests 80% faster

Save Tokens

Stop generation before waste

Setup: Define Your Tools

First, create a tool registry with constraints:
import glyph

registry = glyph.ToolRegistry()

# Search tool
registry.register(
    name="search",
    description="Search the web",
    args={
        "query": {
            "type": "str",
            "required": True,
            "min_len": 1,
            "max_len": 500,
        },
        "max_results": {
            "type": "int",
            "min": 1,
            "max": 20,
            "default": 10,
        },
    }
)

# Calculate tool
registry.register(
    name="calculate",
    description="Evaluate a math expression",
    args={
        "expression": {
            "type": "str",
            "required": True,
        },
        "precision": {
            "type": "int",
            "min": 0,
            "max": 15,
            "default": 2,
        },
    }
)

# Get weather tool
registry.register(
    name="get_weather",
    description="Get weather for a location",
    args={
        "location": {
            "type": "str",
            "required": True,
        },
        "units": {
            "type": "str",
            "enum": ["celsius", "fahrenheit"],
            "default": "celsius",
        },
    }
)

Basic Streaming Validation

Validate as tokens arrive:
import glyph
import asyncio
from anthropic import AsyncAnthropic

async def stream_with_validation(prompt: str):
    client = AsyncAnthropic()
    validator = glyph.StreamingValidator(registry)
    
    collected_tokens = []
    
    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        system="Respond with tool calls in GLYPH format: ToolName{arg=value}"
    ) as stream:
        async for token in stream.text_stream:
            collected_tokens.append(token)
            result = validator.push(token)
            
            # Tool name detected
            if result.tool_name and result.tool_detected_at_token:
                print(f"[Token {result.tool_detected_at_token}] Tool: {result.tool_name}")
                
                # Unknown tool - cancel immediately
                if not result.tool_allowed:
                    print(f"[CANCEL] Unknown tool: {result.tool_name}")
                    await stream.close()
                    return None
            
            # Validation error - cancel
            if result.should_stop():
                print(f"[CANCEL] Validation error: {result.errors}")
                await stream.close()
                return None
    
    # Stream complete - execute if valid
    final = validator.finalize()
    if final.valid:
        print(f"[OK] Executing: {final.tool_name}")
        print(f"     Args: {final.fields}")
        return execute_tool(final.tool_name, final.fields)
    else:
        print(f"[INVALID] {final.errors}")
        return None

def execute_tool(name: str, args: dict):
    """Your tool execution logic."""
    print(f"Executing {name} with {args}")
    # ... actual implementation
    return {"status": "ok", "tool": name}

# Usage
result = await stream_with_validation("What's the weather in Tokyo?")
1

Create Validator

Initialize StreamingValidator with your tool registry.
2

Push Tokens

As each token arrives from the LLM, call validator.push(token).
3

Check Tool Name

Once the tool name is detected (typically token 3-5), check if it’s allowed.
4

Cancel or Continue

If tool is unknown or constraints violated, cancel the stream immediately.
5

Execute

When stream completes and validation passes, execute the tool.

When Validation Cancels

The validator cancels at different stages:
ConditionWhen DetectedAction
Unknown tool nameToken ~3-5Cancel immediately
Wrong argument typeWhen type detectedCancel when type mismatch found
Constraint violationWhen value completeCancel after parsing full value
Missing required argAt closing }Error after full parse

Example Timeline

For input: unknown_tool{query="test"}
Token  Text           Validator State
-----  ------------   ----------------------------------
1      "unknown"      Accumulating tool name...
2      "_tool"        Tool name detected: "unknown_tool"
                       ❌ Not in registry → CANCEL
For input: search{query="test" max_results=1000}
Token  Text           Validator State
-----  ------------   ----------------------------------
1-3    "search{"      Tool: search ✓
4-8    "query="...    Parsing args...
9-12   "max_results"  Parsing args...
13     "=1000"        Value: 1000
                       ❌ max_results > 20 → CANCEL

Advanced: Timeline Analysis

The validator tracks detailed timing:
validator = glyph.StreamingValidator(registry)
validator.start()  # Start timing

for token in tokens:
    result = validator.push(token)

# After completion
print("Timeline:")
for event in result.timeline:
    print(f"  {event.event:15s} @ token {event.token:3d} (+{event.elapsed}ms)")
    print(f"    {event.detail}")

# Output:
# Timeline:
#   TOOL_DETECTED   @ token   3 (+12ms)
#     tool=search allowed=True
#   COMPLETE        @ token  15 (+45ms)
#     valid=True

Error Handling

Handle different error scenarios:
validator = glyph.StreamingValidator(registry)

try:
    async for token in llm_stream:
        result = validator.push(token)
        
        # Unknown tool
        if result.tool_name and not result.tool_allowed:
            await llm_stream.close()
            raise ToolNotFoundError(
                tool=result.tool_name,
                detected_at=result.tool_detected_at_token
            )
        
        # Constraint violation
        if result.should_stop():
            await llm_stream.close()
            raise ToolConstraintError(
                tool=result.tool_name,
                errors=result.errors
            )
    
    # Check final state
    final = validator.finalize()
    if not final.valid:
        raise ToolValidationError(
            tool=final.tool_name,
            errors=final.errors
        )
    
    # Execute
    return await execute_tool(final.tool_name, final.fields)
    
except ToolNotFoundError as e:
    logger.warning(f"Unknown tool detected early: {e.tool}")
    return {"error": f"Unknown tool: {e.tool}"}

except ToolConstraintError as e:
    logger.warning(f"Constraint violation: {e.errors}")
    return {"error": f"Invalid arguments: {e.errors}"}

except ToolValidationError as e:
    logger.error(f"Validation failed: {e.errors}")
    return {"error": f"Tool validation failed: {e.errors}"}

Constraint Types

String Constraints

registry.register("create_user", {
    "username": {
        "type": "str",
        "required": True,
        "min_len": 3,
        "max_len": 20,
        "pattern": r"^[a-zA-Z0-9_]+$",
    },
    "email": {
        "type": "str",
        "required": True,
        "pattern": r"^[^@]+@[^@]+\.[^@]+$",
    },
    "role": {
        "type": "str",
        "enum": ["admin", "user", "guest"],
        "default": "user",
    },
})

Numeric Constraints

registry.register("set_volume", {
    "level": {
        "type": "int",
        "required": True,
        "min": 0,
        "max": 100,
    },
    "fade_duration": {
        "type": "float",
        "min": 0.0,
        "max": 10.0,
        "default": 0.5,
    },
})

Performance Tuning

Set limits to prevent DoS:
validator = glyph.StreamingValidator(
    registry,
    limits={
        "max_buffer_size": 1024 * 1024,  # 1MB max
        "max_field_count": 1000,         # Max 1000 fields
        "max_error_count": 100,          # Stop after 100 errors
    }
)

Integration Examples

With Anthropic

import glyph
from anthropic import AsyncAnthropic

client = AsyncAnthropic()
validator = glyph.StreamingValidator(registry)

async with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": prompt}],
) as stream:
    async for token in stream.text_stream:
        result = validator.push(token)
        
        if result.should_stop():
            await stream.close()
            break

With OpenAI

import glyph
from openai import AsyncOpenAI

client = AsyncOpenAI()
validator = glyph.StreamingValidator(registry)

stream = await client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": prompt}],
    stream=True,
)

async for chunk in stream:
    if chunk.choices[0].delta.content:
        token = chunk.choices[0].delta.content
        result = validator.push(token)
        
        if result.should_stop():
            await stream.aclose()
            break

With LangChain

import glyph
from langchain.callbacks.base import AsyncCallbackHandler
from langchain_anthropic import ChatAnthropic

class GLYPHValidationCallback(AsyncCallbackHandler):
    def __init__(self, registry):
        self.validator = glyph.StreamingValidator(registry)
    
    async def on_llm_new_token(self, token: str, **kwargs):
        result = self.validator.push(token)
        
        if result.should_stop():
            raise ValueError(f"Validation error: {result.errors}")

llm = ChatAnthropic(
    model="claude-sonnet-4-20250514",
    callbacks=[GLYPHValidationCallback(registry)],
    streaming=True,
)

Testing Validation

Test your tool validators:
import pytest
import glyph

def test_unknown_tool():
    registry = glyph.ToolRegistry()
    registry.register("search", {"query": {"type": "str"}})
    
    validator = glyph.StreamingValidator(registry)
    
    # Simulate streaming "unknown{query=test}"
    for token in ["unknown", "{", "query", "=", "test", "}"]:
        result = validator.push(token)
    
    assert result.tool_name == "unknown"
    assert not result.tool_allowed
    assert result.should_stop()

def test_constraint_violation():
    registry = glyph.ToolRegistry()
    registry.register("search", {
        "query": {"type": "str", "min_len": 1},
        "max_results": {"type": "int", "max": 20},
    })
    
    validator = glyph.StreamingValidator(registry)
    
    # max_results=1000 violates constraint
    for token in ["search", "{", "max_results", "=", "1000", "}"]:
        result = validator.push(token)
    
    assert result.errors
    assert "max_results" in str(result.errors[0])

Next Steps

State Management

Manage agent state with patches and fingerprinting

AI Agents

Complete agent patterns with ReAct and multi-agent

Build docs developers (and LLMs) love