Overview
Hive tracks LLM token usage throughout execution, enabling cost monitoring, budget enforcement, and optimization.
Token Tracking
Per-Execution Tracking
Tokens are automatically tracked during execution:
from framework.runtime.agent_runtime import AgentRuntime
# Execute and get result
result = await runtime.trigger_and_wait( "api" , { "query" : "help" })
if result:
# Access token metrics
print ( f "Input tokens: { result.metrics.get( 'input_tokens' , 0 ) } " )
print ( f "Output tokens: { result.metrics.get( 'output_tokens' , 0 ) } " )
print ( f "Total tokens: { result.metrics.get( 'total_tokens' , 0 ) } " )
Session-Level Aggregation
Token usage is aggregated in session state:
from framework.storage.session_store import SessionStore
store = SessionStore(base_path)
state = await store.read_state(session_id)
# Access token totals
print ( f "Total input tokens: { state.metrics.total_input_tokens } " )
print ( f "Total output tokens: { state.metrics.total_output_tokens } " )
print ( f "Total tokens: { state.metrics.total_tokens } " )
Node-Level Breakdown
View per-node token consumption:
from framework.runtime.runtime_log_store import RuntimeLogStore
log_store = RuntimeLogStore(base_path)
details = await log_store.load_details(session_id)
for node in details.nodes:
print ( f "Node { node.node_id } :" )
print ( f " LLM calls: { node.llm_call_count } " )
print ( f " Tokens: { node.total_tokens } " )
print ( f " Avg tokens/call: { node.total_tokens / node.llm_call_count :.0f} " )
Real-Time Monitoring
Track token usage as it happens:
from framework.runtime.event_bus import EventType
total_input = 0
total_output = 0
async def track_tokens ( event ):
global total_input, total_output
total_input += event.data.get( 'input_tokens' , 0 )
total_output += event.data.get( 'output_tokens' , 0 )
print ( f "LLM call: { event.data[ 'model' ] } " )
print ( f " + { event.data[ 'input_tokens' ] } input tokens" )
print ( f " + { event.data[ 'output_tokens' ] } output tokens" )
print ( f " Total: { total_input + total_output } tokens" )
bus.subscribe(
event_types = [EventType. LLM_TURN_COMPLETE ],
handler = track_tokens,
)
Cost Estimation
Calculate costs from token counts:
# Pricing (per 1M tokens, as of 2024)
PRICING = {
"claude-4-5-sonnet-20250514" : { "input" : 3.00 , "output" : 15.00 },
"claude-3-7-sonnet-20250219" : { "input" : 3.00 , "output" : 15.00 },
"claude-3-5-haiku-20241022" : { "input" : 1.00 , "output" : 5.00 },
"gpt-4o" : { "input" : 2.50 , "output" : 10.00 },
"gpt-4o-mini" : { "input" : 0.15 , "output" : 0.60 },
}
def calculate_cost ( model : str , input_tokens : int , output_tokens : int ) -> float :
"""Calculate cost in USD."""
prices = PRICING .get(model, { "input" : 0 , "output" : 0 })
input_cost = (input_tokens / 1_000_000 ) * prices[ "input" ]
output_cost = (output_tokens / 1_000_000 ) * prices[ "output" ]
return input_cost + output_cost
# Example
model = "claude-4-5-sonnet-20250514"
input_tokens = 12_000
output_tokens = 3_500
cost = calculate_cost(model, input_tokens, output_tokens)
print ( f "Estimated cost: $ { cost :.4f} " )
# Output: Estimated cost: $0.0885
Execution Cost Report
async def cost_report ( session_id : str ):
"""Generate cost report for a session."""
log_store = RuntimeLogStore(base_path)
details = await log_store.load_details(session_id)
total_cost = 0.0
node_costs = {}
for node in details.nodes:
# Aggregate tokens per model
model_tokens = {}
for step in node.steps:
if "model" in step and "input_tokens" in step:
model = step[ "model" ]
if model not in model_tokens:
model_tokens[model] = { "input" : 0 , "output" : 0 }
model_tokens[model][ "input" ] += step.get( "input_tokens" , 0 )
model_tokens[model][ "output" ] += step.get( "output_tokens" , 0 )
# Calculate node cost
node_cost = 0.0
for model, tokens in model_tokens.items():
node_cost += calculate_cost(model, tokens[ "input" ], tokens[ "output" ])
node_costs[node.node_id] = node_cost
total_cost += node_cost
# Print report
print ( f "Cost Report: { session_id } " )
print ( f " { 'Node' :<30} { 'Cost' :>10} " )
print ( "-" * 42 )
for node_id, cost in sorted (node_costs.items(), key = lambda x : - x[ 1 ]):
print ( f " { node_id :<30} $ { cost :>9.4f} " )
print ( "-" * 42 )
print ( f " { 'TOTAL' :<30} $ { total_cost :>9.4f} " )
await cost_report( "session_20260206_143022_abc12345" )
Output:
Cost Report: session_20260206_143022_abc12345
Node Cost
------------------------------------------
plan $0.1234
research $0.0856
write $0.0623
review $0.0198
------------------------------------------
TOTAL $0.2911
Context Budget Management
Hive automatically manages conversation context to stay within token budgets:
Token Budgets
Configure per-node context limits:
from framework.graph import NodeSpec
node = NodeSpec(
id = "researcher" ,
node_type = "event_loop" ,
loop_config = {
"max_history_tokens" : 100_000 , # Total context budget
"max_iterations" : 50 ,
},
)
Or configure at graph level:
from framework.graph.edge import GraphSpec
graph = GraphSpec(
id = "research-graph" ,
loop_config = {
"max_history_tokens" : 128_000 , # Shared budget
"max_iterations" : 100 ,
"max_tool_calls_per_turn" : 10 ,
},
)
Automatic Context Compaction
When context approaches the budget, Hive automatically compacts:
# From event_loop_node.py
# Pre-send guard: compact if at/over budget
if current_tokens >= max_history_tokens:
await self ._compact_context()
# Post-turn cleanup: prune old tool results
if len (tool_results) > tool_result_budget:
oldest = tool_results[: - tool_result_budget]
for result in oldest:
self ._conversation.remove_message(result)
Compaction strategies:
Tier 0 : Prune old tool results (zero cost, no LLM)
Tier 1 : Summarize old phases with LLM (20% of budget)
Tier 2 : Destructive compaction (keep only system + recent messages)
Compaction is automatic and transparent . Nodes continue executing without interruption. The CONTEXT_COMPACTED event is emitted for monitoring.
Context Monitoring
from framework.runtime.event_bus import EventType
async def monitor_context ( event ):
data = event.data
print ( f "Context compaction in node { event.node_id } :" )
print ( f " Before: { data[ 'tokens_before' ] } tokens" )
print ( f " After: { data[ 'tokens_after' ] } tokens" )
print ( f " Saved: { data[ 'tokens_saved' ] } tokens ( { data[ 'savings_pct' ] :.0f} %)" )
print ( f " Strategy: { data[ 'strategy' ] } " )
bus.subscribe(
event_types = [EventType. CONTEXT_COMPACTED ],
handler = monitor_context,
)
Budget Enforcement
Hard Limits
Prevent runaway token usage:
total_tokens = 0
MAX_BUDGET = 1_000_000 # 1M token limit
async def enforce_budget ( event ):
global total_tokens
tokens = event.data.get( 'input_tokens' , 0 ) + event.data.get( 'output_tokens' , 0 )
total_tokens += tokens
if total_tokens > MAX_BUDGET :
print ( f "🚨 Budget exceeded: { total_tokens :,} > { MAX_BUDGET :,} " )
# Cancel execution
await runtime.cancel_execution(event.execution_id)
raise BudgetExceededError( f "Token budget exhausted: { total_tokens :,} tokens" )
bus.subscribe(
event_types = [EventType. LLM_TURN_COMPLETE ],
handler = enforce_budget,
)
Rate Limiting
Control API usage:
import asyncio
import time
class RateLimiter :
def __init__ ( self , max_calls : int , window_seconds : float ):
self .max_calls = max_calls
self .window_seconds = window_seconds
self .calls = []
async def acquire ( self ):
"""Wait if rate limit exceeded."""
now = time.time()
# Remove old calls outside window
self .calls = [t for t in self .calls if t > now - self .window_seconds]
if len ( self .calls) >= self .max_calls:
# Wait until oldest call expires
sleep_time = self .calls[ 0 ] + self .window_seconds - now
if sleep_time > 0 :
print ( f "Rate limit: sleeping { sleep_time :.1f} s" )
await asyncio.sleep(sleep_time)
self .calls.append(now)
# 60 LLM calls per minute
limiter = RateLimiter( max_calls = 60 , window_seconds = 60.0 )
async def rate_limited_handler ( event ):
if event.type == EventType. LLM_TURN_COMPLETE :
await limiter.acquire()
bus.subscribe(
event_types = [EventType. LLM_TURN_COMPLETE ],
handler = rate_limited_handler,
)
Cost Optimization
Model Selection
Use cheaper models for simple tasks:
from framework.graph import NodeSpec
# Expensive model for complex reasoning
planner = NodeSpec(
id = "planner" ,
node_type = "event_loop" ,
model = "claude-4-5-sonnet-20250514" , # $3/$15 per 1M tokens
)
# Cheap model for simple classification
classifier = NodeSpec(
id = "classifier" ,
node_type = "event_loop" ,
model = "claude-3-5-haiku-20241022" , # $1/$5 per 1M tokens
)
Context Pruning
Aggressive pruning for cost-sensitive workflows:
from framework.graph import NodeSpec
node = NodeSpec(
id = "summarizer" ,
node_type = "event_loop" ,
loop_config = {
"max_history_tokens" : 32_000 , # Tight budget
"context_overflow_strategy" : "prune_old_tool_results" ,
},
)
Batch Processing
Amortize prompt overhead:
# Instead of N separate calls
for item in items:
result = await process_single(item) # N × prompt_tokens
# Batch into fewer calls
batch_size = 10
for batch in chunks(items, batch_size):
results = await process_batch(batch) # (N/10) × prompt_tokens
Reporting & Analytics
Daily Cost Dashboard
import asyncio
from datetime import datetime, timedelta
async def daily_report ():
"""Generate daily cost report."""
today = datetime.now().date()
yesterday = today - timedelta( days = 1 )
# List sessions from yesterday
store = SessionStore(base_path)
sessions = await store.list_sessions( limit = 1000 )
daily_sessions = [
s for s in sessions
if datetime.fromisoformat(s.timestamps.started_at).date() == yesterday
]
total_tokens = 0
total_cost = 0.0
for session in daily_sessions:
total_tokens += session.metrics.total_tokens
# Estimate cost (assumes default model)
total_cost += calculate_cost(
"claude-4-5-sonnet-20250514" ,
session.metrics.total_input_tokens,
session.metrics.total_output_tokens,
)
print ( f "Daily Report: { yesterday } " )
print ( f " Sessions: { len (daily_sessions) } " )
print ( f " Total tokens: { total_tokens :,} " )
print ( f " Estimated cost: $ { total_cost :.2f} " )
print ( f " Avg cost/session: $ { total_cost / len (daily_sessions) :.4f} " )
await daily_report()
Export to Prometheus
from prometheus_client import Counter, Histogram, start_http_server
token_counter = Counter(
'hive_tokens_total' ,
'Total LLM tokens used' ,
[ 'model' , 'type' ], # input/output
)
cost_histogram = Histogram(
'hive_execution_cost_usd' ,
'Execution cost in USD' ,
[ 'agent_id' , 'entry_point' ],
)
async def export_metrics ( event ):
if event.type == EventType. LLM_TURN_COMPLETE :
model = event.data[ 'model' ]
token_counter.labels( model = model, type = 'input' ).inc(
event.data[ 'input_tokens' ]
)
token_counter.labels( model = model, type = 'output' ).inc(
event.data[ 'output_tokens' ]
)
elif event.type == EventType. EXECUTION_COMPLETED :
# Calculate cost (simplified)
cost = event.data.get( 'cost' , 0.0 )
cost_histogram.labels(
agent_id = event.data.get( 'agent_id' , 'unknown' ),
entry_point = event.data.get( 'entry_point' , 'unknown' ),
).observe(cost)
bus.subscribe(
event_types = [EventType. LLM_TURN_COMPLETE , EventType. EXECUTION_COMPLETED ],
handler = export_metrics,
)
# Start Prometheus exporter
start_http_server( 8000 )
Best Practices
Model Tiering Use expensive models only where necessary. Route simple tasks to cheaper models.
Context Hygiene Regularly prune old tool results. Don’t hoard conversation history indefinitely.
Budget Alerts Monitor token usage in real-time. Alert on anomalies before costs spiral.
Cost Attribution Tag executions with customer_id or project_id for accurate cost allocation.