Skip to main content

Overview

Handles streaming chat responses from multiple LLM providers: OpenRouter, DeepSeek, Groq, and Google Gemini. Manages context building, message history, and real-time token streaming over WebSocket.

Client Initialization

OpenRouter/OpenAI-Compatible Client

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
    default_headers={
        "HTTP-Referer": "https://github.com/ritz541/flower-engine",
        "X-Title": "The Flower Roleplay Engine",
    },
)
Used for OpenRouter models (default provider).

DeepSeek Client

ds_client = AsyncOpenAI(
    base_url="https://api.deepseek.com",
    api_key=DEEPSEEK_API_KEY
)
Used for models with IDs starting with deepseek-.

Groq Client

groq_client = AsyncOpenAI(
    base_url=GROQ_BASE_URL,
    api_key=GROQ_API_KEY
)
Used for Groq-hosted models (e.g., llama-, mixtral-, gemma-).

Gemini Client

gemini_client = None
if GEMINI_API_KEY:
    try:
        from google import genai
        gemini_client = genai.Client(api_key=GEMINI_API_KEY)
    except Exception as e:
        log.error(f"Failed to initialize Gemini client: {e}")
Used for models with IDs starting with gemini/.

Main Function

stream_chat_response()

async def stream_chat_response(
    ws: WebSocket,
    prompt: str,
    context: str,
    world_id: str,
    char_id: str,
    session_id: str = "",
)
Streams an AI chat response in real-time over WebSocket.
ws
WebSocket
required
FastAPI WebSocket connection for sending response chunks
prompt
string
required
User’s current message
context
string
required
Additional context (scene, retrieved lore, memory)
world_id
string
required
Active world ID
char_id
string
required
Active character ID
session_id
string
Active session ID (empty string for legacy mode)

Context Building

Character Information

character = char_manager.get_character(char_id)
char_name = character.name if character else "a wanderer"
char_persona = character.persona if character else "A mysterious figure."

World Information

world = world_manager.get_world(world_id)
world_scene = world.scene if world and world.scene else ""
world_system_prompt = world.system_prompt if world and world.system_prompt else ""

Scene Injection

Scene is added to context only on the first message:
history_count = 0
if session_id:
    recent_msgs = msg_manager.get_messages(char_id, session_id, limit=2)
    history_count = len(recent_msgs)

if world_scene and history_count <= 1:
    context = f"--- SCENE ---\n{world_scene}\n\n{context}"

Rules Loading

rules_block = ""
if state.ACTIVE_RULES:
    loaded_texts = []
    for rule_id in state.ACTIVE_RULES:
        try:
            with open(f"assets/rules/{rule_id}.yaml", "r", encoding="utf-8") as f:
                data = yaml.safe_load(f)
                if data and "prompt" in data:
                    loaded_texts.append(data["prompt"].strip())
        except Exception:
            pass
    if loaded_texts:
        rules_block = "### UNIVERSAL LAWS ###\n" + "\n\n".join(loaded_texts)

System Prompt Assembly

system_prompt = build_system_prompt(
    char_name, char_persona, rules_block, world_system_prompt, context
)
See engine/prompt.py for system prompt template.

Message History

History Retrieval

history_messages = []
if session_id:
    # Get last 11 messages (about 5 exchanges)
    all_recent = msg_manager.get_messages(char_id, session_id, limit=11)
    
    # Filter out the current prompt to avoid double-entry
    if all_recent and all_recent[-1].content == prompt and all_recent[-1].role == "user":
        recent_msgs = all_recent[:-1]
    else:
        recent_msgs = all_recent
        
    # Take only the last 10 messages (5 full exchanges)
    recent_msgs = recent_msgs[-10:]
    
    for msg in recent_msgs:
        history_messages.append({"role": msg.role, "content": msg.content})

Message Array Construction

messages = [{"role": "system", "content": system_prompt}]
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
Structure
  1. System prompt (always first)
  2. Last 10 messages from history
  3. Current user prompt

Provider Selection

Gemini Models

if state.CURRENT_MODEL.startswith("gemini/"):
    if not gemini_client:
        await ws.send_text(build_ws_payload(
            "system_update",
            "✗ Gemini API Key missing! Add gemini_api_key to config.yaml."
        ))
        return
        
    log.info(f"Using official Gemini client for {state.CURRENT_MODEL}")
    model_to_use = state.CURRENT_MODEL.replace("gemini/", "")

DeepSeek Models

if state.CURRENT_MODEL.startswith("deepseek-"):
    active_client = ds_client
    log.info(f"Using DeepSeek official client for {state.CURRENT_MODEL}")

Groq Models

elif (
    state.CURRENT_MODEL.startswith("groq/")
    or any(
        x in state.CURRENT_MODEL.lower()
        for x in ["llama-", "mixtral-", "gemma-"]
    )
    and not "/" in state.CURRENT_MODEL
):
    active_client = groq_client
    log.info(f"Using Groq client for {state.CURRENT_MODEL}")

OpenRouter (Default)

else:
    active_client = client
    log.info(f"Using OpenRouter client for {state.CURRENT_MODEL}")

Streaming

Gemini Streaming

# Convert messages to Gemini format
gemini_msgs = []
for m in messages:
    role = "user" if m["role"] in ["user", "system"] else "model"
    gemini_msgs.append({"role": role, "parts": [{"text": m["content"]}]})

# Extract system instruction
system_instruction = None
if messages[0]["role"] == "system":
    system_instruction = messages[0]["content"]
    gemini_msgs = gemini_msgs[1:]

start_time = time.time()
for chunk in gemini_client.models.generate_content_stream(
    model=model_to_use,
    contents=gemini_msgs,
    config={"system_instruction": system_instruction} if system_instruction else None
):
    if chunk.text:
        full_content += chunk.text
        total_tokens += 1
        
        elapsed = time.time() - start_time
        tps = total_tokens / elapsed if elapsed > 0 else 0
        
        await ws.send_text(build_ws_payload("chat_chunk", chunk.text, {
            "tps": round(tps, 2),
            "model": state.CURRENT_MODEL
        }))

OpenAI-Compatible Streaming

response = await active_client.chat.completions.create(
    model=model_to_use, messages=messages, stream=True
)

start_time = None
async for chunk in response:
    if not start_time:
        start_time = time.time()
    delta = (
        chunk.choices[0].delta.content
        if chunk.choices and chunk.choices[0].delta
        else None
    )
    if delta:
        full_content += delta
        total_tokens += 1
        elapsed = time.time() - start_time
        tps = total_tokens / elapsed if elapsed > 0 else 0.0
        metadata = {
            "model": state.CURRENT_MODEL,
            "tokens_per_second": round(tps, 2),
            "world_id": world_id,
        }
        await ws.send_text(build_ws_payload("chat_chunk", delta, metadata))

WebSocket Message Format

Chat Chunk
{
  "event": "chat_chunk",
  "payload": {
    "content": "Hello! ",
    "metadata": {
      "model": "deepseek-chat",
      "tokens_per_second": 45.23,
      "world_id": "fantasy"
    }
  }
}
Chat End
{
  "event": "chat_end",
  "payload": {
    "content": "",
    "metadata": {"total_tokens": 342}
  }
}

Message Persistence

Save Assistant Response

if full_content:
    msg_manager.add_message(
        Message(
            role="assistant",
            content=full_content,
            character_id=char_id,
            session_id=session_id,
        )
    )

Store in RAG Memory

memory_key = f"{char_id}_{session_id}" if session_id else char_id
rag_manager.add_memory(
    memory_key, str(uuid.uuid4()), f"User: {prompt}\nAI: {full_content}"
)
Stores the exchange as a vector embedding for semantic search.

Update Session

if session_id:
    session_manager.touch(session_id)  # Update last_used_at
    sess = session_manager.get_session(session_id)
    if sess and sess.title in ("", "New session"):
        session_manager.update_title(session_id, prompt)
Sets session title to first user message.

Error Handling

Stream Cancellation

except asyncio.CancelledError:
    log.info(f"Stream cancelled after {total_tokens} tokens.")

Provider Errors

except Exception as e:
    log.error(f"Error during streaming: {e}")
    await ws.send_text(build_ws_payload("error", str(e)))
    return

Final Cleanup

finally:
    await ws.send_text(
        build_ws_payload("chat_end", "", {"total_tokens": total_tokens})
    )

Performance Metrics

Tokens Per Second (TPS)

elapsed = time.time() - start_time
tps = total_tokens / elapsed if elapsed > 0 else 0.0
Calculated in real-time and sent with each chunk.

Total Tokens

total_tokens = 0
# ... streaming loop
total_tokens += 1
Sent in the final chat_end message.

Example Usage

Direct Function Call

import asyncio
from fastapi import WebSocket
from engine.llm import stream_chat_response

async def test_stream(websocket: WebSocket):
    await stream_chat_response(
        ws=websocket,
        prompt="Tell me about the ancient ruins.",
        context="--- RECENT MEMORY ---\nUser asked about the forest earlier.",
        world_id="fantasy",
        char_id="elara",
        session_id="a1b2c3d4e5f6"
    )

With Context Building

from engine.rag import rag_manager
import engine.state as state

# Retrieve RAG context
lore_list, _ = rag_manager.query_lore(
    state.ACTIVE_WORLD_ID, prompt, n_results=2
)
mem_key = f"{state.ACTIVE_CHARACTER_ID}_{state.ACTIVE_SESSION_ID}"
mem_list, _ = rag_manager.query_memory(mem_key, prompt, n_results=3)

full_context = (
    f"--- RECENT MEMORY ---\n{chr(10).join(mem_list)}" if mem_list else ""
)

# Stream response
await stream_chat_response(
    websocket,
    prompt,
    full_context,
    state.ACTIVE_WORLD_ID,
    state.ACTIVE_CHARACTER_ID,
    state.ACTIVE_SESSION_ID,
)

Provider-Specific Notes

OpenRouter

  • Supports 100+ models
  • Requires OPENAI_API_KEY in config
  • Uses OpenAI-compatible API format

DeepSeek

  • Official API endpoint: https://api.deepseek.com
  • Supports deepseek-chat and deepseek-reasoner
  • Uses OpenAI-compatible API format

Groq

  • Fast inference for Llama, Mixtral, Gemma models
  • Requires GROQ_API_KEY in config
  • Uses OpenAI-compatible API format

Gemini

  • Google’s official SDK: google-genai
  • Requires GEMINI_API_KEY in config
  • Uses native Gemini API format (converted from OpenAI format)
  • System instructions passed via config parameter

Configuration Requirements

In config.yaml:
# OpenRouter (default)
openai_api_key: "sk-or-v1-..."
openai_base_url: "https://openrouter.ai/api/v1"

# DeepSeek
deepseek_api_key: "sk-..."

# Groq
groq_api_key: "gsk_..."
groq_base_url: "https://api.groq.com/openai/v1"

# Gemini
gemini_api_key: "AIza..."

Build docs developers (and LLMs) love