Overview
Handles streaming chat responses from multiple LLM providers: OpenRouter, DeepSeek, Groq, and Google Gemini. Manages context building, message history, and real-time token streaming over WebSocket.
Client Initialization
OpenRouter/OpenAI-Compatible Client
client = AsyncOpenAI(
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
default_headers={
"HTTP-Referer": "https://github.com/ritz541/flower-engine",
"X-Title": "The Flower Roleplay Engine",
},
)
Used for OpenRouter models (default provider).
DeepSeek Client
ds_client = AsyncOpenAI(
base_url="https://api.deepseek.com",
api_key=DEEPSEEK_API_KEY
)
Used for models with IDs starting with deepseek-.
Groq Client
groq_client = AsyncOpenAI(
base_url=GROQ_BASE_URL,
api_key=GROQ_API_KEY
)
Used for Groq-hosted models (e.g., llama-, mixtral-, gemma-).
Gemini Client
gemini_client = None
if GEMINI_API_KEY:
try:
from google import genai
gemini_client = genai.Client(api_key=GEMINI_API_KEY)
except Exception as e:
log.error(f"Failed to initialize Gemini client: {e}")
Used for models with IDs starting with gemini/.
Main Function
stream_chat_response()
async def stream_chat_response(
ws: WebSocket,
prompt: str,
context: str,
world_id: str,
char_id: str,
session_id: str = "",
)
Streams an AI chat response in real-time over WebSocket.
FastAPI WebSocket connection for sending response chunks
Additional context (scene, retrieved lore, memory)
Active session ID (empty string for legacy mode)
Context Building
character = char_manager.get_character(char_id)
char_name = character.name if character else "a wanderer"
char_persona = character.persona if character else "A mysterious figure."
world = world_manager.get_world(world_id)
world_scene = world.scene if world and world.scene else ""
world_system_prompt = world.system_prompt if world and world.system_prompt else ""
Scene Injection
Scene is added to context only on the first message:
history_count = 0
if session_id:
recent_msgs = msg_manager.get_messages(char_id, session_id, limit=2)
history_count = len(recent_msgs)
if world_scene and history_count <= 1:
context = f"--- SCENE ---\n{world_scene}\n\n{context}"
Rules Loading
rules_block = ""
if state.ACTIVE_RULES:
loaded_texts = []
for rule_id in state.ACTIVE_RULES:
try:
with open(f"assets/rules/{rule_id}.yaml", "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data and "prompt" in data:
loaded_texts.append(data["prompt"].strip())
except Exception:
pass
if loaded_texts:
rules_block = "### UNIVERSAL LAWS ###\n" + "\n\n".join(loaded_texts)
System Prompt Assembly
system_prompt = build_system_prompt(
char_name, char_persona, rules_block, world_system_prompt, context
)
See engine/prompt.py for system prompt template.
Message History
History Retrieval
history_messages = []
if session_id:
# Get last 11 messages (about 5 exchanges)
all_recent = msg_manager.get_messages(char_id, session_id, limit=11)
# Filter out the current prompt to avoid double-entry
if all_recent and all_recent[-1].content == prompt and all_recent[-1].role == "user":
recent_msgs = all_recent[:-1]
else:
recent_msgs = all_recent
# Take only the last 10 messages (5 full exchanges)
recent_msgs = recent_msgs[-10:]
for msg in recent_msgs:
history_messages.append({"role": msg.role, "content": msg.content})
Message Array Construction
messages = [{"role": "system", "content": system_prompt}]
messages.extend(history_messages)
messages.append({"role": "user", "content": prompt})
Structure
- System prompt (always first)
- Last 10 messages from history
- Current user prompt
Provider Selection
Gemini Models
if state.CURRENT_MODEL.startswith("gemini/"):
if not gemini_client:
await ws.send_text(build_ws_payload(
"system_update",
"✗ Gemini API Key missing! Add gemini_api_key to config.yaml."
))
return
log.info(f"Using official Gemini client for {state.CURRENT_MODEL}")
model_to_use = state.CURRENT_MODEL.replace("gemini/", "")
DeepSeek Models
if state.CURRENT_MODEL.startswith("deepseek-"):
active_client = ds_client
log.info(f"Using DeepSeek official client for {state.CURRENT_MODEL}")
Groq Models
elif (
state.CURRENT_MODEL.startswith("groq/")
or any(
x in state.CURRENT_MODEL.lower()
for x in ["llama-", "mixtral-", "gemma-"]
)
and not "/" in state.CURRENT_MODEL
):
active_client = groq_client
log.info(f"Using Groq client for {state.CURRENT_MODEL}")
OpenRouter (Default)
else:
active_client = client
log.info(f"Using OpenRouter client for {state.CURRENT_MODEL}")
Streaming
Gemini Streaming
# Convert messages to Gemini format
gemini_msgs = []
for m in messages:
role = "user" if m["role"] in ["user", "system"] else "model"
gemini_msgs.append({"role": role, "parts": [{"text": m["content"]}]})
# Extract system instruction
system_instruction = None
if messages[0]["role"] == "system":
system_instruction = messages[0]["content"]
gemini_msgs = gemini_msgs[1:]
start_time = time.time()
for chunk in gemini_client.models.generate_content_stream(
model=model_to_use,
contents=gemini_msgs,
config={"system_instruction": system_instruction} if system_instruction else None
):
if chunk.text:
full_content += chunk.text
total_tokens += 1
elapsed = time.time() - start_time
tps = total_tokens / elapsed if elapsed > 0 else 0
await ws.send_text(build_ws_payload("chat_chunk", chunk.text, {
"tps": round(tps, 2),
"model": state.CURRENT_MODEL
}))
OpenAI-Compatible Streaming
response = await active_client.chat.completions.create(
model=model_to_use, messages=messages, stream=True
)
start_time = None
async for chunk in response:
if not start_time:
start_time = time.time()
delta = (
chunk.choices[0].delta.content
if chunk.choices and chunk.choices[0].delta
else None
)
if delta:
full_content += delta
total_tokens += 1
elapsed = time.time() - start_time
tps = total_tokens / elapsed if elapsed > 0 else 0.0
metadata = {
"model": state.CURRENT_MODEL,
"tokens_per_second": round(tps, 2),
"world_id": world_id,
}
await ws.send_text(build_ws_payload("chat_chunk", delta, metadata))
Chat Chunk
{
"event": "chat_chunk",
"payload": {
"content": "Hello! ",
"metadata": {
"model": "deepseek-chat",
"tokens_per_second": 45.23,
"world_id": "fantasy"
}
}
}
Chat End
{
"event": "chat_end",
"payload": {
"content": "",
"metadata": {"total_tokens": 342}
}
}
Message Persistence
Save Assistant Response
if full_content:
msg_manager.add_message(
Message(
role="assistant",
content=full_content,
character_id=char_id,
session_id=session_id,
)
)
Store in RAG Memory
memory_key = f"{char_id}_{session_id}" if session_id else char_id
rag_manager.add_memory(
memory_key, str(uuid.uuid4()), f"User: {prompt}\nAI: {full_content}"
)
Stores the exchange as a vector embedding for semantic search.
Update Session
if session_id:
session_manager.touch(session_id) # Update last_used_at
sess = session_manager.get_session(session_id)
if sess and sess.title in ("", "New session"):
session_manager.update_title(session_id, prompt)
Sets session title to first user message.
Error Handling
Stream Cancellation
except asyncio.CancelledError:
log.info(f"Stream cancelled after {total_tokens} tokens.")
Provider Errors
except Exception as e:
log.error(f"Error during streaming: {e}")
await ws.send_text(build_ws_payload("error", str(e)))
return
Final Cleanup
finally:
await ws.send_text(
build_ws_payload("chat_end", "", {"total_tokens": total_tokens})
)
Tokens Per Second (TPS)
elapsed = time.time() - start_time
tps = total_tokens / elapsed if elapsed > 0 else 0.0
Calculated in real-time and sent with each chunk.
Total Tokens
total_tokens = 0
# ... streaming loop
total_tokens += 1
Sent in the final chat_end message.
Example Usage
Direct Function Call
import asyncio
from fastapi import WebSocket
from engine.llm import stream_chat_response
async def test_stream(websocket: WebSocket):
await stream_chat_response(
ws=websocket,
prompt="Tell me about the ancient ruins.",
context="--- RECENT MEMORY ---\nUser asked about the forest earlier.",
world_id="fantasy",
char_id="elara",
session_id="a1b2c3d4e5f6"
)
With Context Building
from engine.rag import rag_manager
import engine.state as state
# Retrieve RAG context
lore_list, _ = rag_manager.query_lore(
state.ACTIVE_WORLD_ID, prompt, n_results=2
)
mem_key = f"{state.ACTIVE_CHARACTER_ID}_{state.ACTIVE_SESSION_ID}"
mem_list, _ = rag_manager.query_memory(mem_key, prompt, n_results=3)
full_context = (
f"--- RECENT MEMORY ---\n{chr(10).join(mem_list)}" if mem_list else ""
)
# Stream response
await stream_chat_response(
websocket,
prompt,
full_context,
state.ACTIVE_WORLD_ID,
state.ACTIVE_CHARACTER_ID,
state.ACTIVE_SESSION_ID,
)
Provider-Specific Notes
OpenRouter
- Supports 100+ models
- Requires
OPENAI_API_KEY in config
- Uses OpenAI-compatible API format
DeepSeek
- Official API endpoint:
https://api.deepseek.com
- Supports
deepseek-chat and deepseek-reasoner
- Uses OpenAI-compatible API format
Groq
- Fast inference for Llama, Mixtral, Gemma models
- Requires
GROQ_API_KEY in config
- Uses OpenAI-compatible API format
Gemini
- Google’s official SDK:
google-genai
- Requires
GEMINI_API_KEY in config
- Uses native Gemini API format (converted from OpenAI format)
- System instructions passed via
config parameter
Configuration Requirements
In config.yaml:
# OpenRouter (default)
openai_api_key: "sk-or-v1-..."
openai_base_url: "https://openrouter.ai/api/v1"
# DeepSeek
deepseek_api_key: "sk-..."
# Groq
groq_api_key: "gsk_..."
groq_base_url: "https://api.groq.com/openai/v1"
# Gemini
gemini_api_key: "AIza..."