Skip to main content
QueryEnginePort is the stateful conversation layer of Claw Code. It accumulates messages across turns, enforces token-budget and turn-count limits, handles structured output serialisation, and persists sessions to disk.

QueryEngineConfig

Configuration dataclass for QueryEnginePort. All fields have defaults; pass an instance to override behaviour.
from src.query_engine import QueryEngineConfig

config = QueryEngineConfig(
    max_turns=12,
    max_budget_tokens=4000,
    structured_output=True,
)
max_turns
number
default:"8"
Maximum number of messages stored before new submit_message calls return stop_reason="max_turns_reached".
max_budget_tokens
number
default:"2000"
Token budget across the entire session (input + output, word-count approximation). When a turn would exceed this value, stop_reason="max_budget_reached" is returned.
compact_after_turns
number
default:"12"
After each turn, if the stored message count exceeds this value the oldest messages are dropped to keep only the most recent compact_after_turns entries.
structured_output
boolean
default:"false"
When True, turn output is serialised as a JSON object containing summary lines and session_id.
structured_retry_limit
number
default:"2"
Number of attempts to serialise structured output before raising RuntimeError.

Factory methods

QueryEnginePort.from_workspace

@classmethod
def from_workspace(cls) -> QueryEnginePort
Creates a new engine instance with a fresh session ID and a PortManifest built from the current working directory.
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_workspace()

QueryEnginePort.from_saved_session

@classmethod
def from_saved_session(cls, session_id: str) -> QueryEnginePort
Loads a previously persisted session from .port_sessions/<session_id>.json and restores the message history and token usage.
session_id
string
required
The hex session ID returned by persist_session() or RuntimeSession.persisted_session_path.
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_saved_session("a3f9b2c1...")
result = engine.submit_message("continue where we left off")
The transcript is marked as already flushed when restoring from a saved session, so persist_session() will not double-write the transcript.

submit_message

def submit_message(
    self,
    prompt: str,
    matched_commands: tuple[str, ...] = (),
    matched_tools: tuple[str, ...] = (),
    denied_tools: tuple[PermissionDenial, ...] = (),
) -> TurnResult
Processes one conversational turn. Checks limits, formats output, updates internal state, and returns a TurnResult.
prompt
string
required
The user message for this turn.
matched_commands
string[]
Names of commands that were routed for this prompt (from PortRuntime.route_prompt).
matched_tools
string[]
Names of tools that were routed for this prompt.
denied_tools
PermissionDenial[]
Tools that were blocked before the turn was processed. See ToolPermissionContext.
Returns TurnResult
prompt
string
The prompt submitted for this turn.
output
string
Formatted turn output. Plain text by default; JSON when structured_output=True.
matched_commands
tuple[str, ...]
Echo of the matched_commands argument.
matched_tools
tuple[str, ...]
Echo of the matched_tools argument.
permission_denials
tuple[PermissionDenial, ...]
Echo of the denied_tools argument.
usage
UsageSummary
Cumulative token counts after this turn.
stop_reason
string
One of "completed", "max_turns_reached", or "max_budget_reached".

Stop reasons

ValueMeaning
"completed"Turn processed normally.
"max_turns_reached"mutable_messages length has reached max_turns; prompt was not processed.
"max_budget_reached"The turn would push cumulative tokens over max_budget_tokens; output is still returned but no further turns should be submitted.
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_workspace()

result = engine.submit_message(
    "summarise the auth module",
    matched_commands=("compact",),
    matched_tools=("FileReadTool",),
)

print(result.output)
print(result.stop_reason)   # "completed"
print(result.usage.input_tokens, result.usage.output_tokens)

stream_submit_message

def stream_submit_message(
    self,
    prompt: str,
    matched_commands: tuple[str, ...] = (),
    matched_tools: tuple[str, ...] = (),
    denied_tools: tuple[PermissionDenial, ...] = (),
) -> Generator[dict, None, None]
Yields a sequence of event dicts that mirror the streaming protocol of Claude Code, then delegates to submit_message internally.
Event typeWhen emittedKey fields
message_startAlways, firstsession_id, prompt
command_matchWhen matched_commands is non-emptycommands: tuple[str, ...]
tool_matchWhen matched_tools is non-emptytools: tuple[str, ...]
permission_denialWhen denied_tools is non-emptydenials: list[str] (tool names)
message_deltaAlways, after submittext: str
message_stopAlways, lastusage, stop_reason, transcript_size
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_workspace()

for event in engine.stream_submit_message("explain query routing"):
    match event["type"]:
        case "message_start":
            print("session:", event["session_id"])
        case "message_delta":
            print(event["text"])
        case "message_stop":
            print("stop:", event["stop_reason"])
stream_submit_message calls submit_message internally, so it updates session state (message history, usage totals, compaction) just like a direct submit_message call.

persist_session

def persist_session(self) -> str
Flushes the transcript and writes the current session (ID, messages, token totals) to .port_sessions/<session_id>.json. Returns the file path as a string.
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_workspace()
path = engine.persist_session()
print(f"Session saved to {path}")
# Session saved to .port_sessions/a3f9b2c1...json
Sessions written by persist_session can be restored with QueryEnginePort.from_saved_session.

compact_messages_if_needed

def compact_messages_if_needed(self) -> None
Called automatically at the end of every submit_message turn. If the number of stored messages exceeds compact_after_turns, the oldest messages are dropped so that only the most recent compact_after_turns entries are kept. The TranscriptStore is compacted by the same amount.
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_workspace()
# Trigger compaction manually
engine.compact_messages_if_needed()

replay_user_messages

def replay_user_messages(self) -> tuple[str, ...]
Returns all prompt strings currently held in the TranscriptStore, in submission order. Useful for auditing or reconstructing the conversation.
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_workspace()
messages = engine.replay_user_messages()
for i, msg in enumerate(messages):
    print(f"{i+1}: {msg}")

flush_transcript

def flush_transcript(self) -> None
Marks the internal TranscriptStore as flushed without writing anything to disk. Called automatically by persist_session. Call directly when you want to reset the flush flag without persisting.

render_summary

def render_summary(self) -> str
Returns a Markdown-formatted summary of the engine state: manifest, command surface, tool surface, session ID, turn count, permission denial count, usage totals, and configuration.
from src.query_engine import QueryEnginePort

engine = QueryEnginePort.from_workspace()
print(engine.render_summary())

Build docs developers (and LLMs) love