Tool Calling with Streaming Validation
Problem: You’re streaming LLM output and want to validate tool calls before generation completes. If the model hallucinates a tool, you want to cancel immediately. Solution: UseStreamingValidator to check tokens incrementally.
import glyph
import asyncio
from anthropic import AsyncAnthropic
# Define your tools with constraints
registry = glyph.ToolRegistry()
registry.register(
name="search",
description="Search the web",
args={
"query": {"type": "str", "required": True, "min_len": 1, "max_len": 500},
"max_results": {"type": "int", "min": 1, "max": 20, "default": 10},
}
)
registry.register(
name="calculate",
description="Evaluate a math expression",
args={
"expression": {"type": "str", "required": True},
"precision": {"type": "int", "min": 0, "max": 15, "default": 2},
}
)
registry.register(
name="get_weather",
description="Get weather for a location",
args={
"location": {"type": "str", "required": True},
"units": {"type": "str", "enum": ["celsius", "fahrenheit"], "default": "celsius"},
}
)
async def stream_with_validation(prompt: str):
client = AsyncAnthropic()
validator = glyph.StreamingValidator(registry)
collected_tokens = []
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
system="Respond with tool calls in GLYPH format: ToolName{arg=value ...}"
) as stream:
async for token in stream.text_stream:
collected_tokens.append(token)
result = validator.push(token)
# Tool name detected
if result.tool_name and result.tool_detected_at_token:
print(f"[Token {result.tool_detected_at_token}] Tool detected: {result.tool_name}")
# Unknown tool - cancel immediately
if not result.tool_allowed:
print(f"[CANCEL] Unknown tool: {result.tool_name}")
await stream.close()
return None
# Validation error - cancel
if result.should_stop():
print(f"[CANCEL] Validation error: {result.errors}")
await stream.close()
return None
# Stream complete - execute if valid
final = validator.finalize()
if final.valid:
print(f"[OK] Executing: {final.tool_name}")
print(f" Args: {final.fields}")
return execute_tool(final.tool_name, final.fields)
else:
print(f"[INVALID] {final.errors}")
return None
def execute_tool(name: str, args: dict):
"""Your tool execution logic here."""
print(f"Executing {name} with {args}")
return {"status": "ok", "tool": name}
# Usage
asyncio.run(stream_with_validation("What's the weather in Tokyo?"))
- Unknown tool detected at token 3-5, not token 50+
- Constraint violations caught mid-stream
- Save 80%+ latency on bad requests
Drop-in JSON Replacement
Problem: You have existing JSON-based code but want token savings without a rewrite. Solution: Usejson_to_glyph and glyph_to_json for seamless conversion.
import glyph
import json
# Your existing data structures work unchanged
user_data = {
"id": "user_123",
"name": "Alice Chen",
"email": "[email protected]",
"preferences": {
"theme": "dark",
"notifications": True,
"language": "en"
},
"tags": ["premium", "beta-tester", "early-adopter"],
"metadata": {
"created_at": "2024-01-15T10:30:00Z",
"last_login": "2025-01-10T14:22:00Z",
"login_count": 847
}
}
# Convert to GLYPH (one line change)
glyph_text = glyph.json_to_glyph(user_data)
print("GLYPH output:")
print(glyph_text)
# {[email protected] id=user_123 metadata={created_at=2024-01-15T10:30:00Z ...} ...}
# Compare sizes
json_text = json.dumps(user_data)
print(f"\nJSON: {len(json_text)} chars")
print(f"GLYPH: {len(glyph_text)} chars")
print(f"Reduction: {100 * (1 - len(glyph_text)/len(json_text)):.1f}%")
# Convert back to Python dict (identical to original)
restored = glyph.glyph_to_json(glyph_text)
assert restored == user_data
Gradual Migration Pattern
def serialize(data: dict, use_glyph: bool = False) -> str:
"""Wrapper for gradual migration."""
if use_glyph:
return glyph.json_to_glyph(data)
return json.dumps(data)
def deserialize(text: str) -> dict:
"""Auto-detect format and parse."""
text = text.strip()
if text.startswith("{") and "=" in text.split("\n")[0]:
# Looks like GLYPH
return glyph.glyph_to_json(text)
return json.loads(text)
# Works with both formats
data1 = deserialize('{"name": "Bob"}')
data2 = deserialize('{name=Bob}')
assert data1 == data2
Token Savings by Data Type
| Data Shape | JSON | GLYPH | Savings |
|---|---|---|---|
| Flat object (5 fields) | ~45 tokens | ~30 tokens | 33% |
| Nested object (3 levels) | ~120 tokens | ~75 tokens | 38% |
| Array of objects (10 items) | ~300 tokens | ~160 tokens | 47% |
| API response (typical) | ~200 tokens | ~120 tokens | 40% |
Agent State Management
Problem: Building a multi-step agent and need to track conversation history, tool results, and working memory efficiently. Solution: Use GLYPH structs with state hashing for verified updates.import glyph
from glyph import g, field
from dataclasses import dataclass, field as dataclass_field
from datetime import datetime
from typing import Optional
import hashlib
@dataclass
class AgentMemory:
"""Structured agent memory with GLYPH serialization."""
conversation: list[dict] = dataclass_field(default_factory=list)
tool_results: dict[str, any] = dataclass_field(default_factory=dict)
working_memory: dict[str, any] = dataclass_field(default_factory=dict)
plan: list[str] = dataclass_field(default_factory=list)
current_step: int = 0
def to_glyph(self) -> str:
"""Serialize to GLYPH format."""
return glyph.emit(g.struct("AgentMemory",
field("conversation", glyph.from_json(self.conversation)),
field("tool_results", glyph.from_json(self.tool_results)),
field("working_memory", glyph.from_json(self.working_memory)),
field("plan", glyph.from_json(self.plan)),
field("current_step", g.int(self.current_step)),
))
@classmethod
def from_glyph(cls, text: str) -> "AgentMemory":
"""Deserialize from GLYPH format."""
v = glyph.parse(text)
return cls(
conversation=glyph.to_json(v.get("conversation")) or [],
tool_results=glyph.to_json(v.get("tool_results")) or {},
working_memory=glyph.to_json(v.get("working_memory")) or {},
plan=glyph.to_json(v.get("plan")) or [],
current_step=v.get("current_step").as_int() if v.get("current_step") else 0,
)
def state_hash(self) -> str:
"""Compute hash for state verification."""
canonical = glyph.emit(glyph.from_json({
"conversation": self.conversation,
"tool_results": self.tool_results,
"working_memory": self.working_memory,
"plan": self.plan,
"current_step": self.current_step,
}))
return hashlib.sha256(canonical.encode()).hexdigest()
def context_window(self, max_messages: int = 10) -> str:
"""Get recent context for LLM, GLYPH-formatted."""
recent = self.conversation[-max_messages:]
working = glyph.json_to_glyph(self.working_memory) if self.working_memory else "{}"
return f"""Memory{{
step={self.current_step}
plan={self.plan}
working={working}
}}"""
class StatefulAgent:
"""Agent with verified state updates."""
def __init__(self):
self.memory = AgentMemory()
self._last_hash: Optional[str] = None
def checkpoint(self) -> tuple[str, str]:
"""Create a checkpoint with state hash."""
state = self.memory.to_glyph()
hash = self.memory.state_hash()
self._last_hash = hash
return state, hash
def apply_update(self, update_fn, expected_base: Optional[str] = None):
"""Apply an update with optional base verification."""
if expected_base and self._last_hash:
if expected_base != self._last_hash:
raise ValueError("State mismatch - concurrent modification detected")
update_fn(self.memory)
self._last_hash = self.memory.state_hash()
def restore(self, state: str, expected_hash: Optional[str] = None):
"""Restore from checkpoint with optional verification."""
self.memory = AgentMemory.from_glyph(state)
actual_hash = self.memory.state_hash()
if expected_hash and actual_hash != expected_hash:
raise ValueError("Checkpoint corrupted - hash mismatch")
self._last_hash = actual_hash
agent = StatefulAgent()
# Add conversation
agent.memory.add_message("user", "Find restaurants in SF")
agent.memory.add_message("assistant", "I'll search for restaurants.")
agent.memory.plan = ["search_restaurants", "filter_by_rating", "format_results"]
# Checkpoint before tool execution
state, hash = agent.checkpoint()
print(f"Checkpoint hash: {hash[:16]}...")
# Execute step with verified update
def step_update(mem: AgentMemory):
mem.add_tool_result("search", "call_1", {
"restaurants": [
{"name": "Flour + Water", "rating": 4.5},
{"name": "State Bird", "rating": 4.7},
]
})
mem.current_step = 1
agent.apply_update(step_update, expected_base=hash)
# Get context for next LLM call
context = agent.memory.context_window()
print(context)
Real-time Progress Streaming
Problem: Long-running agent tasks need progress updates, but you don’t want to mix progress with data in your protocol. Solution: Use structured progress messages with GLYPH.import glyph
from glyph import g, field
import asyncio
from typing import Callable
from datetime import datetime
def emit_progress(pct: float, message: str) -> str:
"""Emit a progress update."""
return glyph.emit(g.struct("Progress",
field("pct", g.float(pct)),
field("msg", g.str(message)),
field("ts", g.str(datetime.utcnow().isoformat() + "Z")),
))
def emit_log(level: str, message: str) -> str:
"""Emit a log message."""
return glyph.emit(g.struct("Log",
field("level", g.str(level)),
field("msg", g.str(message)),
field("ts", g.str(datetime.utcnow().isoformat() + "Z")),
))
class ProgressReporter:
"""Stream progress events alongside data."""
def __init__(self):
self.events = []
async def send_progress(self, pct: float, message: str):
"""Send progress update."""
self.events.append(("progress", emit_progress(pct, message)))
async def send_log(self, level: str, message: str):
"""Send log message."""
self.events.append(("log", emit_log(level, message)))
async def send_data(self, data: any, final: bool = False):
"""Send data."""
self.events.append(("data", glyph.json_to_glyph(data)))
async def process_documents_with_progress(
docs: list[str],
reporter: ProgressReporter,
processor: Callable[[str], dict]
):
"""Process documents with real-time progress updates."""
total = len(docs)
await reporter.send_log("info", f"Starting processing of {total} documents")
results = []
start_time = asyncio.get_event_loop().time()
for i, doc in enumerate(docs):
# Progress update
pct = (i + 1) / total
await reporter.send_progress(pct, f"Processing document {i+1}/{total}")
# Process document
try:
result = processor(doc)
results.append(result)
except Exception as e:
await reporter.send_log("error", f"Failed to process doc {i}: {e}")
# Final summary
elapsed = asyncio.get_event_loop().time() - start_time
await reporter.send_log("info", f"Completed {total} documents in {elapsed:.1f}s")
await reporter.send_data({"total": total, "successful": len(results)}, final=True)
return results
Batch Data with Tabular Mode
Problem: Sending large datasets to/from LLMs (embeddings, search results, structured outputs) and JSON arrays are token-expensive. Solution: Use tabular mode for 50-70% token savings on homogeneous lists.import glyph
from glyph import GValue, MapEntry
import json
# Define your data
search_results = [
{"id": "doc_1", "title": "Introduction to GLYPH", "score": 0.95},
{"id": "doc_2", "title": "Streaming Validation", "score": 0.89},
{"id": "doc_3", "title": "Agent State Management", "score": 0.84},
{"id": "doc_4", "title": "Tabular Mode Guide", "score": 0.82},
{"id": "doc_5", "title": "JSON Migration Path", "score": 0.78},
]
# Convert to GValue list of maps (auto-tabular kicks in for 3+ homogeneous items)
rows = GValue.list_(*[
GValue.map_(
MapEntry("id", GValue.str_(r["id"])),
MapEntry("title", GValue.str_(r["title"])),
MapEntry("score", GValue.float_(r["score"])),
)
for r in search_results
])
# Emit - will automatically use tabular format
table_text = glyph.emit(rows)
print(table_text)
# Output:
# @tab _ [id score title]
# |doc_1|0.95|Introduction to GLYPH|
# |doc_2|0.89|Streaming Validation|
# |doc_3|0.84|Agent State Management|
# |doc_4|0.82|Tabular Mode Guide|
# |doc_5|0.78|JSON Migration Path|
# @end
# Compare to JSON
json_text = json.dumps(search_results)
print(f"\nJSON: {len(json_text)} chars")
print(f"Tabular: {len(table_text)} chars")
print(f"Savings: {100 * (1 - len(table_text)/len(json_text)):.0f}%")
# Parse back
parsed = glyph.parse(table_text)
assert len(parsed) == 5
assert parsed.index(0).get("id").as_str() == "doc_1"
Building RAG Context with Tables
def build_rag_context(query: str, results: list[dict], max_results: int = 5) -> str:
"""Build RAG context with tabular search results."""
top_results = results[:max_results]
# Build as GValue list
rows = GValue.list_(*[
GValue.map_(
MapEntry("id", GValue.str_(r["id"])),
MapEntry("title", GValue.str_(r["title"])),
MapEntry("content", GValue.str_(r.get("content", "")[:200])),
)
for r in top_results
])
docs_table = glyph.emit(rows)
return f"""Query: {query}
Relevant documents:
{docs_table}
Based on these documents, provide a comprehensive answer."""
Checkpoint & Resume
Problem: Long-running agent tasks need to be resumable after failures or restarts. Solution: GLYPH checkpoints with integrity verification.import glyph
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Any
from datetime import datetime
import hashlib
@dataclass
class TaskCheckpoint:
"""Checkpoint for a resumable task."""
task_id: str
task_type: str
created_at: str
updated_at: str
# Progress tracking
total_steps: int
completed_steps: int
current_step: int
# State
input_data: Any
intermediate_results: List[Any] = field(default_factory=list)
final_result: Optional[Any] = None
# Error handling
last_error: Optional[str] = None
retry_count: int = 0
# Integrity
state_hash: Optional[str] = None
def to_glyph(self) -> str:
"""Serialize checkpoint to GLYPH."""
return glyph.json_to_glyph(asdict(self))
@classmethod
def from_glyph(cls, text: str) -> "TaskCheckpoint":
"""Deserialize checkpoint from GLYPH."""
data = glyph.glyph_to_json(text)
return cls(**data)
def compute_hash(self) -> str:
"""Compute state hash for integrity verification."""
temp_hash = self.state_hash
self.state_hash = None
canonical = glyph.json_to_glyph(asdict(self))
self.state_hash = temp_hash
return hashlib.sha256(canonical.encode()).hexdigest()
def verify_integrity(self) -> bool:
"""Verify checkpoint integrity."""
if not self.state_hash:
return True
return self.compute_hash() == self.state_hash
class CheckpointManager:
"""Manage task checkpoints."""
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
def save(self, checkpoint: TaskCheckpoint):
"""Save checkpoint to disk."""
checkpoint.updated_at = datetime.utcnow().isoformat() + "Z"
checkpoint.state_hash = checkpoint.compute_hash()
path = self.checkpoint_dir / f"{checkpoint.task_id}.glyph"
with open(path, "w") as f:
f.write(checkpoint.to_glyph())
def load(self, task_id: str) -> Optional[TaskCheckpoint]:
"""Load checkpoint from disk."""
path = self.checkpoint_dir / f"{task_id}.glyph"
if not path.exists():
return None
with open(path) as f:
checkpoint = TaskCheckpoint.from_glyph(f.read())
if not checkpoint.verify_integrity():
raise ValueError(f"Checkpoint integrity check failed for {task_id}")
return checkpoint
All recipes include production-ready error handling and can be adapted for your specific use case.