async def run(
self,
user_message: str,
*,
session_key: str = "cli:default",
session_messages: list[LLMMessage] | None = None,
model: str | None = None,
) -> AgentRunResult:
# 1. Cost-aware model routing
if model:
effective_model = model
elif self._config.agents.model_tiers.enabled:
complexity = classify_complexity(user_message, tool_calls_in_session=...)
effective_model = select_model(defaults.model, tiers, complexity)
else:
effective_model = defaults.model
# 2. Check semantic cache for identical recent query
if self._semantic_cache:
cached = self._semantic_cache.get(user_message, effective_model)
if cached is not None:
return AgentRunResult(response=cached, iterations=0, ...)
# 3. Load session history (limited to memory_window)
if self._session_mgr:
session = self._session_mgr.get_or_create(session_key)
history = session.get_recent(immediate_window)
session_summary = session.summary
else:
history = []
# 4. Build system message with context
system_msg = self._context_builder.build_system_message(
user_message=user_message,
session_key=session_key,
)
# 5. Assemble message list
messages: list[LLMMessage] = [system_msg]
if session_summary:
messages.append(LLMMessage(role="system", content=session_summary))
if self._memory_mgr:
relevant_context = self._retrieve_relevant_context(user_message)
if relevant_context:
messages.append(LLMMessage(role="system", content=relevant_context))
messages.extend(history)
messages.append(LLMMessage(role="user", content=user_message))
# 6. Tool execution loop
iteration = 0
while True:
iteration += 1
if max_iter > 0 and iteration > max_iter:
break
# Mid-run compaction to prevent context overflow
if iteration > 1:
messages = await self._maybe_compact_mid_run(messages, effective_model)
# Call LLM
response = await self._call_llm(messages, tools=tools, model=effective_model, ...)
# No tool calls → return final response
if not response.tool_calls:
self._persist_session(session, user_message, response.content)
if self._semantic_cache and not all_tool_calls:
self._semantic_cache.put(user_message, effective_model, response.content)
return AgentRunResult(response=response.content, iterations=iteration, ...)
# Execute tools in parallel
messages.append(LLMMessage(role="assistant", content=response.content, tool_calls=response.tool_calls))
exec_results = await asyncio.gather(
*(self._execute_tool(tc, tool_ctx) for tc in response.tool_calls)
)
# Append tool results to messages
for exec_result in exec_results:
scrubbed_output = _scrub_secrets(exec_result.output)
messages.append(LLMMessage(
role="tool",
content=scrubbed_output,
tool_call_id=exec_result.tool_call_id,
name=exec_result.tool_name,
))
# Self-correction: inject reflection prompt if tools failed
if failed_tools and defaults.enable_self_correction:
messages.append(LLMMessage(
role="system",
content=f"[Self-correction] The following tool calls failed: ..."
))