Skip to main content

Overview

Hive tracks LLM token usage throughout execution, enabling cost monitoring, budget enforcement, and optimization.

Token Tracking

Per-Execution Tracking

Tokens are automatically tracked during execution:
from framework.runtime.agent_runtime import AgentRuntime

# Execute and get result
result = await runtime.trigger_and_wait("api", {"query": "help"})

if result:
    # Access token metrics
    print(f"Input tokens: {result.metrics.get('input_tokens', 0)}")
    print(f"Output tokens: {result.metrics.get('output_tokens', 0)}")
    print(f"Total tokens: {result.metrics.get('total_tokens', 0)}")

Session-Level Aggregation

Token usage is aggregated in session state:
from framework.storage.session_store import SessionStore

store = SessionStore(base_path)
state = await store.read_state(session_id)

# Access token totals
print(f"Total input tokens: {state.metrics.total_input_tokens}")
print(f"Total output tokens: {state.metrics.total_output_tokens}")
print(f"Total tokens: {state.metrics.total_tokens}")

Node-Level Breakdown

View per-node token consumption:
from framework.runtime.runtime_log_store import RuntimeLogStore

log_store = RuntimeLogStore(base_path)
details = await log_store.load_details(session_id)

for node in details.nodes:
    print(f"Node {node.node_id}:")
    print(f"  LLM calls: {node.llm_call_count}")
    print(f"  Tokens: {node.total_tokens}")
    print(f"  Avg tokens/call: {node.total_tokens / node.llm_call_count:.0f}")

Real-Time Monitoring

Track token usage as it happens:
from framework.runtime.event_bus import EventType

total_input = 0
total_output = 0

async def track_tokens(event):
    global total_input, total_output
    
    total_input += event.data.get('input_tokens', 0)
    total_output += event.data.get('output_tokens', 0)
    
    print(f"LLM call: {event.data['model']}")
    print(f"  +{event.data['input_tokens']} input tokens")
    print(f"  +{event.data['output_tokens']} output tokens")
    print(f"  Total: {total_input + total_output} tokens")

bus.subscribe(
    event_types=[EventType.LLM_TURN_COMPLETE],
    handler=track_tokens,
)

Cost Estimation

Calculate costs from token counts:
# Pricing (per 1M tokens, as of 2024)
PRICING = {
    "claude-4-5-sonnet-20250514": {"input": 3.00, "output": 15.00},
    "claude-3-7-sonnet-20250219": {"input": 3.00, "output": 15.00},
    "claude-3-5-haiku-20241022": {"input": 1.00, "output": 5.00},
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """Calculate cost in USD."""
    prices = PRICING.get(model, {"input": 0, "output": 0})
    
    input_cost = (input_tokens / 1_000_000) * prices["input"]
    output_cost = (output_tokens / 1_000_000) * prices["output"]
    
    return input_cost + output_cost

# Example
model = "claude-4-5-sonnet-20250514"
input_tokens = 12_000
output_tokens = 3_500

cost = calculate_cost(model, input_tokens, output_tokens)
print(f"Estimated cost: ${cost:.4f}")
# Output: Estimated cost: $0.0885

Execution Cost Report

async def cost_report(session_id: str):
    """Generate cost report for a session."""
    log_store = RuntimeLogStore(base_path)
    details = await log_store.load_details(session_id)
    
    total_cost = 0.0
    node_costs = {}
    
    for node in details.nodes:
        # Aggregate tokens per model
        model_tokens = {}
        
        for step in node.steps:
            if "model" in step and "input_tokens" in step:
                model = step["model"]
                if model not in model_tokens:
                    model_tokens[model] = {"input": 0, "output": 0}
                
                model_tokens[model]["input"] += step.get("input_tokens", 0)
                model_tokens[model]["output"] += step.get("output_tokens", 0)
        
        # Calculate node cost
        node_cost = 0.0
        for model, tokens in model_tokens.items():
            node_cost += calculate_cost(model, tokens["input"], tokens["output"])
        
        node_costs[node.node_id] = node_cost
        total_cost += node_cost
    
    # Print report
    print(f"Cost Report: {session_id}")
    print(f"{'Node':<30} {'Cost':>10}")
    print("-" * 42)
    
    for node_id, cost in sorted(node_costs.items(), key=lambda x: -x[1]):
        print(f"{node_id:<30} ${cost:>9.4f}")
    
    print("-" * 42)
    print(f"{'TOTAL':<30} ${total_cost:>9.4f}")

await cost_report("session_20260206_143022_abc12345")
Output:
Cost Report: session_20260206_143022_abc12345
Node                           Cost
------------------------------------------
plan                           $0.1234
research                       $0.0856
write                          $0.0623
review                         $0.0198
------------------------------------------
TOTAL                          $0.2911

Context Budget Management

Hive automatically manages conversation context to stay within token budgets:

Token Budgets

Configure per-node context limits:
from framework.graph import NodeSpec

node = NodeSpec(
    id="researcher",
    node_type="event_loop",
    loop_config={
        "max_history_tokens": 100_000,  # Total context budget
        "max_iterations": 50,
    },
)
Or configure at graph level:
from framework.graph.edge import GraphSpec

graph = GraphSpec(
    id="research-graph",
    loop_config={
        "max_history_tokens": 128_000,  # Shared budget
        "max_iterations": 100,
        "max_tool_calls_per_turn": 10,
    },
)

Automatic Context Compaction

When context approaches the budget, Hive automatically compacts:
# From event_loop_node.py

# Pre-send guard: compact if at/over budget
if current_tokens >= max_history_tokens:
    await self._compact_context()

# Post-turn cleanup: prune old tool results
if len(tool_results) > tool_result_budget:
    oldest = tool_results[:-tool_result_budget]
    for result in oldest:
        self._conversation.remove_message(result)
Compaction strategies:
  1. Tier 0: Prune old tool results (zero cost, no LLM)
  2. Tier 1: Summarize old phases with LLM (20% of budget)
  3. Tier 2: Destructive compaction (keep only system + recent messages)
Compaction is automatic and transparent. Nodes continue executing without interruption. The CONTEXT_COMPACTED event is emitted for monitoring.

Context Monitoring

from framework.runtime.event_bus import EventType

async def monitor_context(event):
    data = event.data
    print(f"Context compaction in node {event.node_id}:")
    print(f"  Before: {data['tokens_before']} tokens")
    print(f"  After: {data['tokens_after']} tokens")
    print(f"  Saved: {data['tokens_saved']} tokens ({data['savings_pct']:.0f}%)")
    print(f"  Strategy: {data['strategy']}")

bus.subscribe(
    event_types=[EventType.CONTEXT_COMPACTED],
    handler=monitor_context,
)

Budget Enforcement

Hard Limits

Prevent runaway token usage:
total_tokens = 0
MAX_BUDGET = 1_000_000  # 1M token limit

async def enforce_budget(event):
    global total_tokens
    
    tokens = event.data.get('input_tokens', 0) + event.data.get('output_tokens', 0)
    total_tokens += tokens
    
    if total_tokens > MAX_BUDGET:
        print(f"🚨 Budget exceeded: {total_tokens:,} > {MAX_BUDGET:,}")
        # Cancel execution
        await runtime.cancel_execution(event.execution_id)
        raise BudgetExceededError(f"Token budget exhausted: {total_tokens:,} tokens")

bus.subscribe(
    event_types=[EventType.LLM_TURN_COMPLETE],
    handler=enforce_budget,
)

Rate Limiting

Control API usage:
import asyncio
import time

class RateLimiter:
    def __init__(self, max_calls: int, window_seconds: float):
        self.max_calls = max_calls
        self.window_seconds = window_seconds
        self.calls = []
    
    async def acquire(self):
        """Wait if rate limit exceeded."""
        now = time.time()
        
        # Remove old calls outside window
        self.calls = [t for t in self.calls if t > now - self.window_seconds]
        
        if len(self.calls) >= self.max_calls:
            # Wait until oldest call expires
            sleep_time = self.calls[0] + self.window_seconds - now
            if sleep_time > 0:
                print(f"Rate limit: sleeping {sleep_time:.1f}s")
                await asyncio.sleep(sleep_time)
        
        self.calls.append(now)

# 60 LLM calls per minute
limiter = RateLimiter(max_calls=60, window_seconds=60.0)

async def rate_limited_handler(event):
    if event.type == EventType.LLM_TURN_COMPLETE:
        await limiter.acquire()

bus.subscribe(
    event_types=[EventType.LLM_TURN_COMPLETE],
    handler=rate_limited_handler,
)

Cost Optimization

Model Selection

Use cheaper models for simple tasks:
from framework.graph import NodeSpec

# Expensive model for complex reasoning
planner = NodeSpec(
    id="planner",
    node_type="event_loop",
    model="claude-4-5-sonnet-20250514",  # $3/$15 per 1M tokens
)

# Cheap model for simple classification
classifier = NodeSpec(
    id="classifier",
    node_type="event_loop",
    model="claude-3-5-haiku-20241022",  # $1/$5 per 1M tokens
)

Context Pruning

Aggressive pruning for cost-sensitive workflows:
from framework.graph import NodeSpec

node = NodeSpec(
    id="summarizer",
    node_type="event_loop",
    loop_config={
        "max_history_tokens": 32_000,  # Tight budget
        "context_overflow_strategy": "prune_old_tool_results",
    },
)

Batch Processing

Amortize prompt overhead:
# Instead of N separate calls
for item in items:
    result = await process_single(item)  # N × prompt_tokens

# Batch into fewer calls
batch_size = 10
for batch in chunks(items, batch_size):
    results = await process_batch(batch)  # (N/10) × prompt_tokens

Reporting & Analytics

Daily Cost Dashboard

import asyncio
from datetime import datetime, timedelta

async def daily_report():
    """Generate daily cost report."""
    today = datetime.now().date()
    yesterday = today - timedelta(days=1)
    
    # List sessions from yesterday
    store = SessionStore(base_path)
    sessions = await store.list_sessions(limit=1000)
    
    daily_sessions = [
        s for s in sessions
        if datetime.fromisoformat(s.timestamps.started_at).date() == yesterday
    ]
    
    total_tokens = 0
    total_cost = 0.0
    
    for session in daily_sessions:
        total_tokens += session.metrics.total_tokens
        # Estimate cost (assumes default model)
        total_cost += calculate_cost(
            "claude-4-5-sonnet-20250514",
            session.metrics.total_input_tokens,
            session.metrics.total_output_tokens,
        )
    
    print(f"Daily Report: {yesterday}")
    print(f"  Sessions: {len(daily_sessions)}")
    print(f"  Total tokens: {total_tokens:,}")
    print(f"  Estimated cost: ${total_cost:.2f}")
    print(f"  Avg cost/session: ${total_cost / len(daily_sessions):.4f}")

await daily_report()

Export to Prometheus

from prometheus_client import Counter, Histogram, start_http_server

token_counter = Counter(
    'hive_tokens_total',
    'Total LLM tokens used',
    ['model', 'type'],  # input/output
)

cost_histogram = Histogram(
    'hive_execution_cost_usd',
    'Execution cost in USD',
    ['agent_id', 'entry_point'],
)

async def export_metrics(event):
    if event.type == EventType.LLM_TURN_COMPLETE:
        model = event.data['model']
        token_counter.labels(model=model, type='input').inc(
            event.data['input_tokens']
        )
        token_counter.labels(model=model, type='output').inc(
            event.data['output_tokens']
        )
    
    elif event.type == EventType.EXECUTION_COMPLETED:
        # Calculate cost (simplified)
        cost = event.data.get('cost', 0.0)
        cost_histogram.labels(
            agent_id=event.data.get('agent_id', 'unknown'),
            entry_point=event.data.get('entry_point', 'unknown'),
        ).observe(cost)

bus.subscribe(
    event_types=[EventType.LLM_TURN_COMPLETE, EventType.EXECUTION_COMPLETED],
    handler=export_metrics,
)

# Start Prometheus exporter
start_http_server(8000)

Best Practices

Model Tiering

Use expensive models only where necessary. Route simple tasks to cheaper models.

Context Hygiene

Regularly prune old tool results. Don’t hoard conversation history indefinitely.

Budget Alerts

Monitor token usage in real-time. Alert on anomalies before costs spiral.

Cost Attribution

Tag executions with customer_id or project_id for accurate cost allocation.

Build docs developers (and LLMs) love