Skip to main content
All examples are copy-paste ready and production-tested.

Tool Calling with Streaming Validation

Problem: You’re streaming LLM output and want to validate tool calls before generation completes. If the model hallucinates a tool, you want to cancel immediately. Solution: Use StreamingValidator to check tokens incrementally.
import glyph
import asyncio
from anthropic import AsyncAnthropic

# Define your tools with constraints
registry = glyph.ToolRegistry()

registry.register(
    name="search",
    description="Search the web",
    args={
        "query": {"type": "str", "required": True, "min_len": 1, "max_len": 500},
        "max_results": {"type": "int", "min": 1, "max": 20, "default": 10},
    }
)

registry.register(
    name="calculate",
    description="Evaluate a math expression",
    args={
        "expression": {"type": "str", "required": True},
        "precision": {"type": "int", "min": 0, "max": 15, "default": 2},
    }
)

registry.register(
    name="get_weather",
    description="Get weather for a location",
    args={
        "location": {"type": "str", "required": True},
        "units": {"type": "str", "enum": ["celsius", "fahrenheit"], "default": "celsius"},
    }
)


async def stream_with_validation(prompt: str):
    client = AsyncAnthropic()
    validator = glyph.StreamingValidator(registry)

    collected_tokens = []

    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        system="Respond with tool calls in GLYPH format: ToolName{arg=value ...}"
    ) as stream:
        async for token in stream.text_stream:
            collected_tokens.append(token)
            result = validator.push(token)

            # Tool name detected
            if result.tool_name and result.tool_detected_at_token:
                print(f"[Token {result.tool_detected_at_token}] Tool detected: {result.tool_name}")

                # Unknown tool - cancel immediately
                if not result.tool_allowed:
                    print(f"[CANCEL] Unknown tool: {result.tool_name}")
                    await stream.close()
                    return None

            # Validation error - cancel
            if result.should_stop():
                print(f"[CANCEL] Validation error: {result.errors}")
                await stream.close()
                return None

    # Stream complete - execute if valid
    final = validator.finalize()
    if final.valid:
        print(f"[OK] Executing: {final.tool_name}")
        print(f"     Args: {final.fields}")
        return execute_tool(final.tool_name, final.fields)
    else:
        print(f"[INVALID] {final.errors}")
        return None


def execute_tool(name: str, args: dict):
    """Your tool execution logic here."""
    print(f"Executing {name} with {args}")
    return {"status": "ok", "tool": name}


# Usage
asyncio.run(stream_with_validation("What's the weather in Tokyo?"))
Why this matters:
  • Unknown tool detected at token 3-5, not token 50+
  • Constraint violations caught mid-stream
  • Save 80%+ latency on bad requests

Drop-in JSON Replacement

Problem: You have existing JSON-based code but want token savings without a rewrite. Solution: Use json_to_glyph and glyph_to_json for seamless conversion.
import glyph
import json

# Your existing data structures work unchanged
user_data = {
    "id": "user_123",
    "name": "Alice Chen",
    "email": "[email protected]",
    "preferences": {
        "theme": "dark",
        "notifications": True,
        "language": "en"
    },
    "tags": ["premium", "beta-tester", "early-adopter"],
    "metadata": {
        "created_at": "2024-01-15T10:30:00Z",
        "last_login": "2025-01-10T14:22:00Z",
        "login_count": 847
    }
}

# Convert to GLYPH (one line change)
glyph_text = glyph.json_to_glyph(user_data)
print("GLYPH output:")
print(glyph_text)
# {[email protected] id=user_123 metadata={created_at=2024-01-15T10:30:00Z ...} ...}

# Compare sizes
json_text = json.dumps(user_data)
print(f"\nJSON:  {len(json_text)} chars")
print(f"GLYPH: {len(glyph_text)} chars")
print(f"Reduction: {100 * (1 - len(glyph_text)/len(json_text)):.1f}%")

# Convert back to Python dict (identical to original)
restored = glyph.glyph_to_json(glyph_text)
assert restored == user_data

Gradual Migration Pattern

def serialize(data: dict, use_glyph: bool = False) -> str:
    """Wrapper for gradual migration."""
    if use_glyph:
        return glyph.json_to_glyph(data)
    return json.dumps(data)


def deserialize(text: str) -> dict:
    """Auto-detect format and parse."""
    text = text.strip()
    if text.startswith("{") and "=" in text.split("\n")[0]:
        # Looks like GLYPH
        return glyph.glyph_to_json(text)
    return json.loads(text)


# Works with both formats
data1 = deserialize('{"name": "Bob"}')
data2 = deserialize('{name=Bob}')
assert data1 == data2

Token Savings by Data Type

Data ShapeJSONGLYPHSavings
Flat object (5 fields)~45 tokens~30 tokens33%
Nested object (3 levels)~120 tokens~75 tokens38%
Array of objects (10 items)~300 tokens~160 tokens47%
API response (typical)~200 tokens~120 tokens40%

Agent State Management

Problem: Building a multi-step agent and need to track conversation history, tool results, and working memory efficiently. Solution: Use GLYPH structs with state hashing for verified updates.
import glyph
from glyph import g, field
from dataclasses import dataclass, field as dataclass_field
from datetime import datetime
from typing import Optional
import hashlib


@dataclass
class AgentMemory:
    """Structured agent memory with GLYPH serialization."""

    conversation: list[dict] = dataclass_field(default_factory=list)
    tool_results: dict[str, any] = dataclass_field(default_factory=dict)
    working_memory: dict[str, any] = dataclass_field(default_factory=dict)
    plan: list[str] = dataclass_field(default_factory=list)
    current_step: int = 0

    def to_glyph(self) -> str:
        """Serialize to GLYPH format."""
        return glyph.emit(g.struct("AgentMemory",
            field("conversation", glyph.from_json(self.conversation)),
            field("tool_results", glyph.from_json(self.tool_results)),
            field("working_memory", glyph.from_json(self.working_memory)),
            field("plan", glyph.from_json(self.plan)),
            field("current_step", g.int(self.current_step)),
        ))

    @classmethod
    def from_glyph(cls, text: str) -> "AgentMemory":
        """Deserialize from GLYPH format."""
        v = glyph.parse(text)
        return cls(
            conversation=glyph.to_json(v.get("conversation")) or [],
            tool_results=glyph.to_json(v.get("tool_results")) or {},
            working_memory=glyph.to_json(v.get("working_memory")) or {},
            plan=glyph.to_json(v.get("plan")) or [],
            current_step=v.get("current_step").as_int() if v.get("current_step") else 0,
        )

    def state_hash(self) -> str:
        """Compute hash for state verification."""
        canonical = glyph.emit(glyph.from_json({
            "conversation": self.conversation,
            "tool_results": self.tool_results,
            "working_memory": self.working_memory,
            "plan": self.plan,
            "current_step": self.current_step,
        }))
        return hashlib.sha256(canonical.encode()).hexdigest()

    def context_window(self, max_messages: int = 10) -> str:
        """Get recent context for LLM, GLYPH-formatted."""
        recent = self.conversation[-max_messages:]
        working = glyph.json_to_glyph(self.working_memory) if self.working_memory else "{}"

        return f"""Memory{{
  step={self.current_step}
  plan={self.plan}
  working={working}
}}"""


class StatefulAgent:
    """Agent with verified state updates."""

    def __init__(self):
        self.memory = AgentMemory()
        self._last_hash: Optional[str] = None

    def checkpoint(self) -> tuple[str, str]:
        """Create a checkpoint with state hash."""
        state = self.memory.to_glyph()
        hash = self.memory.state_hash()
        self._last_hash = hash
        return state, hash

    def apply_update(self, update_fn, expected_base: Optional[str] = None):
        """Apply an update with optional base verification."""
        if expected_base and self._last_hash:
            if expected_base != self._last_hash:
                raise ValueError("State mismatch - concurrent modification detected")

        update_fn(self.memory)
        self._last_hash = self.memory.state_hash()

    def restore(self, state: str, expected_hash: Optional[str] = None):
        """Restore from checkpoint with optional verification."""
        self.memory = AgentMemory.from_glyph(state)
        actual_hash = self.memory.state_hash()

        if expected_hash and actual_hash != expected_hash:
            raise ValueError("Checkpoint corrupted - hash mismatch")

        self._last_hash = actual_hash
Usage Example:
agent = StatefulAgent()

# Add conversation
agent.memory.add_message("user", "Find restaurants in SF")
agent.memory.add_message("assistant", "I'll search for restaurants.")
agent.memory.plan = ["search_restaurants", "filter_by_rating", "format_results"]

# Checkpoint before tool execution
state, hash = agent.checkpoint()
print(f"Checkpoint hash: {hash[:16]}...")

# Execute step with verified update
def step_update(mem: AgentMemory):
    mem.add_tool_result("search", "call_1", {
        "restaurants": [
            {"name": "Flour + Water", "rating": 4.5},
            {"name": "State Bird", "rating": 4.7},
        ]
    })
    mem.current_step = 1

agent.apply_update(step_update, expected_base=hash)

# Get context for next LLM call
context = agent.memory.context_window()
print(context)

Real-time Progress Streaming

Problem: Long-running agent tasks need progress updates, but you don’t want to mix progress with data in your protocol. Solution: Use structured progress messages with GLYPH.
import glyph
from glyph import g, field
import asyncio
from typing import Callable
from datetime import datetime


def emit_progress(pct: float, message: str) -> str:
    """Emit a progress update."""
    return glyph.emit(g.struct("Progress",
        field("pct", g.float(pct)),
        field("msg", g.str(message)),
        field("ts", g.str(datetime.utcnow().isoformat() + "Z")),
    ))


def emit_log(level: str, message: str) -> str:
    """Emit a log message."""
    return glyph.emit(g.struct("Log",
        field("level", g.str(level)),
        field("msg", g.str(message)),
        field("ts", g.str(datetime.utcnow().isoformat() + "Z")),
    ))


class ProgressReporter:
    """Stream progress events alongside data."""

    def __init__(self):
        self.events = []

    async def send_progress(self, pct: float, message: str):
        """Send progress update."""
        self.events.append(("progress", emit_progress(pct, message)))

    async def send_log(self, level: str, message: str):
        """Send log message."""
        self.events.append(("log", emit_log(level, message)))

    async def send_data(self, data: any, final: bool = False):
        """Send data."""
        self.events.append(("data", glyph.json_to_glyph(data)))


async def process_documents_with_progress(
    docs: list[str],
    reporter: ProgressReporter,
    processor: Callable[[str], dict]
):
    """Process documents with real-time progress updates."""

    total = len(docs)
    await reporter.send_log("info", f"Starting processing of {total} documents")

    results = []
    start_time = asyncio.get_event_loop().time()

    for i, doc in enumerate(docs):
        # Progress update
        pct = (i + 1) / total
        await reporter.send_progress(pct, f"Processing document {i+1}/{total}")

        # Process document
        try:
            result = processor(doc)
            results.append(result)
        except Exception as e:
            await reporter.send_log("error", f"Failed to process doc {i}: {e}")

    # Final summary
    elapsed = asyncio.get_event_loop().time() - start_time
    await reporter.send_log("info", f"Completed {total} documents in {elapsed:.1f}s")
    await reporter.send_data({"total": total, "successful": len(results)}, final=True)

    return results

Batch Data with Tabular Mode

Problem: Sending large datasets to/from LLMs (embeddings, search results, structured outputs) and JSON arrays are token-expensive. Solution: Use tabular mode for 50-70% token savings on homogeneous lists.
import glyph
from glyph import GValue, MapEntry
import json

# Define your data
search_results = [
    {"id": "doc_1", "title": "Introduction to GLYPH", "score": 0.95},
    {"id": "doc_2", "title": "Streaming Validation", "score": 0.89},
    {"id": "doc_3", "title": "Agent State Management", "score": 0.84},
    {"id": "doc_4", "title": "Tabular Mode Guide", "score": 0.82},
    {"id": "doc_5", "title": "JSON Migration Path", "score": 0.78},
]

# Convert to GValue list of maps (auto-tabular kicks in for 3+ homogeneous items)
rows = GValue.list_(*[
    GValue.map_(
        MapEntry("id", GValue.str_(r["id"])),
        MapEntry("title", GValue.str_(r["title"])),
        MapEntry("score", GValue.float_(r["score"])),
    )
    for r in search_results
])

# Emit - will automatically use tabular format
table_text = glyph.emit(rows)
print(table_text)
# Output:
# @tab _ [id score title]
# |doc_1|0.95|Introduction to GLYPH|
# |doc_2|0.89|Streaming Validation|
# |doc_3|0.84|Agent State Management|
# |doc_4|0.82|Tabular Mode Guide|
# |doc_5|0.78|JSON Migration Path|
# @end

# Compare to JSON
json_text = json.dumps(search_results)
print(f"\nJSON:    {len(json_text)} chars")
print(f"Tabular: {len(table_text)} chars")
print(f"Savings: {100 * (1 - len(table_text)/len(json_text)):.0f}%")

# Parse back
parsed = glyph.parse(table_text)
assert len(parsed) == 5
assert parsed.index(0).get("id").as_str() == "doc_1"

Building RAG Context with Tables

def build_rag_context(query: str, results: list[dict], max_results: int = 5) -> str:
    """Build RAG context with tabular search results."""

    top_results = results[:max_results]

    # Build as GValue list
    rows = GValue.list_(*[
        GValue.map_(
            MapEntry("id", GValue.str_(r["id"])),
            MapEntry("title", GValue.str_(r["title"])),
            MapEntry("content", GValue.str_(r.get("content", "")[:200])),
        )
        for r in top_results
    ])

    docs_table = glyph.emit(rows)

    return f"""Query: {query}

Relevant documents:
{docs_table}

Based on these documents, provide a comprehensive answer."""

Checkpoint & Resume

Problem: Long-running agent tasks need to be resumable after failures or restarts. Solution: GLYPH checkpoints with integrity verification.
import glyph
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Any
from datetime import datetime
import hashlib


@dataclass
class TaskCheckpoint:
    """Checkpoint for a resumable task."""

    task_id: str
    task_type: str
    created_at: str
    updated_at: str

    # Progress tracking
    total_steps: int
    completed_steps: int
    current_step: int

    # State
    input_data: Any
    intermediate_results: List[Any] = field(default_factory=list)
    final_result: Optional[Any] = None

    # Error handling
    last_error: Optional[str] = None
    retry_count: int = 0

    # Integrity
    state_hash: Optional[str] = None

    def to_glyph(self) -> str:
        """Serialize checkpoint to GLYPH."""
        return glyph.json_to_glyph(asdict(self))

    @classmethod
    def from_glyph(cls, text: str) -> "TaskCheckpoint":
        """Deserialize checkpoint from GLYPH."""
        data = glyph.glyph_to_json(text)
        return cls(**data)

    def compute_hash(self) -> str:
        """Compute state hash for integrity verification."""
        temp_hash = self.state_hash
        self.state_hash = None
        canonical = glyph.json_to_glyph(asdict(self))
        self.state_hash = temp_hash
        return hashlib.sha256(canonical.encode()).hexdigest()

    def verify_integrity(self) -> bool:
        """Verify checkpoint integrity."""
        if not self.state_hash:
            return True
        return self.compute_hash() == self.state_hash


class CheckpointManager:
    """Manage task checkpoints."""

    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)

    def save(self, checkpoint: TaskCheckpoint):
        """Save checkpoint to disk."""
        checkpoint.updated_at = datetime.utcnow().isoformat() + "Z"
        checkpoint.state_hash = checkpoint.compute_hash()

        path = self.checkpoint_dir / f"{checkpoint.task_id}.glyph"
        with open(path, "w") as f:
            f.write(checkpoint.to_glyph())

    def load(self, task_id: str) -> Optional[TaskCheckpoint]:
        """Load checkpoint from disk."""
        path = self.checkpoint_dir / f"{task_id}.glyph"
        if not path.exists():
            return None

        with open(path) as f:
            checkpoint = TaskCheckpoint.from_glyph(f.read())

        if not checkpoint.verify_integrity():
            raise ValueError(f"Checkpoint integrity check failed for {task_id}")

        return checkpoint
All recipes include production-ready error handling and can be adapted for your specific use case.

Build docs developers (and LLMs) love