Overview
The main entry point for the Flower Engine backend. Initializes the FastAPI app, loads assets from YAML files, fetches available AI models, and handles WebSocket connections for real-time chat.
FastAPI Application
app
app = FastAPI(title="The Flower Engine")
The main FastAPI application instance.
Startup Event
startup()
@app.on_event("startup")
async def startup()
Executed when the FastAPI application starts. Performs initialization tasks:
- Load YAML Assets: Syncs worlds and characters from
assets/worlds/*.yaml and assets/characters/*.yaml to the database
- Process World Lore: Splits large lore text into 800-character chunks for RAG vector storage
- Fetch AI Models: Retrieves available models from OpenRouter, Groq, and Gemini APIs
- Add Static Models: Includes DeepSeek Chat and DeepSeek Reasoner models
World Loading
for data in load_yaml_assets("assets/worlds/*.yaml"):
w = World(
id=data["id"],
name=data["name"],
lore=data.get("lore", ""),
start_message=data.get("start_message", ""),
scene=data.get("scene", ""),
system_prompt=data.get("system_prompt", ""),
)
world_manager.add_world(w)
Unique identifier for the world
Display name of the world
Background lore and world-building text (automatically chunked for RAG)
Opening message shown when a new session begins
Current scene description (injected on first message only)
World-specific system instructions for the AI
Character Loading
for data in load_yaml_assets("assets/characters/*.yaml"):
c = Character(
id=data["id"],
name=data["name"],
persona=data.get("persona", "")
)
char_manager.add_character(c)
Unique identifier for the character
Character personality and behavior description
Model Fetching
The startup event fetches models from multiple providers:
OpenRouter Models
resp = await hc.get("https://openrouter.ai/api/v1/models", headers=headers)
state.AVAILABLE_MODELS.append({
"id": m["id"],
"name": f"[OpenRouter] {m.get('name', m['id'])}",
"prompt_price": round(float(p.get("prompt", 0)) * 1e6, 4),
"completion_price": round(float(p.get("completion", 0)) * 1e6, 4),
})
Groq Models
resp = await hc.get(f"{GROQ_BASE_URL}/models", headers=headers)
Gemini Models
gemini_list = [
{"id": "gemini/gemini-3.1-pro-preview", "name": "[Gemini] Gemini 3.1 Pro"},
{"id": "gemini/gemini-3-flash-preview", "name": "[Gemini] Gemini 3 Flash"},
{"id": "gemini/gemini-3.1-flash-lite-preview", "name": "[Gemini] Gemini 3.1 Flash-Lite"},
]
DeepSeek Models
state.AVAILABLE_MODELS.append({
"id": "deepseek-chat",
"name": "[DeepSeek] DeepSeek Chat",
"prompt_price": 0.14,
"completion_price": 0.28,
})
WebSocket Endpoint
websocket_endpoint(websocket: WebSocket)
@app.websocket("/ws/rpc")
async def websocket_endpoint(websocket: WebSocket)
Handles WebSocket connections for real-time chat and commands.
FastAPI WebSocket connection object
Connection Flow
- Accept Connection: Accepts the WebSocket and sends a ready message
- Broadcast State: Sends current engine state (worlds, characters, sessions)
- Message Loop: Continuously receives and processes messages
await websocket.accept()
await websocket.send_text(
build_ws_payload("system_update", "✓ Engine ready.", {"status": "ok"})
)
await broadcast_sync_state(websocket)
Message Handling
Command Messages (start with /)
if prompt.startswith("/"):
await handle_command(prompt, websocket)
continue
Chat Messages
Requires active world, character, and session:
if (
not state.ACTIVE_WORLD_ID
or not state.ACTIVE_CHARACTER_ID
or not state.ACTIVE_SESSION_ID
):
await websocket.send_text(
build_ws_payload(
"system_update",
"✗ Prepare the stage first (World, Char, Session).",
)
)
continue
RAG Context Retrieval
# Retrieve lore chunks for current world
lore_list, _ = rag_manager.query_lore(
state.ACTIVE_WORLD_ID, prompt, n_results=2
)
# Retrieve session memory
mem_key = f"{state.ACTIVE_CHARACTER_ID}_{state.ACTIVE_SESSION_ID}"
mem_list, _ = rag_manager.query_memory(mem_key, prompt, n_results=3)
Up to 2 most relevant lore chunks from the current world
Up to 3 most relevant conversation exchanges from session history
Message Saving
# Save user message BEFORE generating response
msg_manager.add_message(
Message(
role="user",
content=prompt,
character_id=state.ACTIVE_CHARACTER_ID,
session_id=state.ACTIVE_SESSION_ID,
)
)
Streaming Response
task = asyncio.create_task(
stream_chat_response(
websocket,
prompt,
full_context,
state.ACTIVE_WORLD_ID,
state.ACTIVE_CHARACTER_ID,
state.ACTIVE_SESSION_ID,
)
)
The streaming task runs in the background while the main loop checks for cancellation:
while not task.done():
try:
raw = await asyncio.wait_for(websocket.receive_text(), timeout=0.05)
cmd_msg = json.loads(raw)
if cmd_msg.get("prompt") == "/cancel":
task.cancel()
await websocket.send_text(
build_ws_payload("system_update", "✗ Stream cancelled by user.")
)
except asyncio.TimeoutError:
continue
Disconnection Handling
except WebSocketDisconnect:
log.info("Disconnected.")
Running the Server
Development Mode
if __name__ == "__main__":
import uvicorn
uvicorn.run("engine.main:app", host="0.0.0.0", port=8000, reload=True)
Starts the server on http://0.0.0.0:8000 with auto-reload enabled.
Production Mode
uvicorn engine.main:app --host 0.0.0.0 --port 8000
Incoming Messages
{
"prompt": "/world select fantasy"
}
Or plain text:
Outgoing Messages
System Update
{
"event": "system_update",
"payload": {
"content": "✓ Engine ready.",
"metadata": {"status": "ok"}
}
}
Chat Chunk
{
"event": "chat_chunk",
"payload": {
"content": "Hello! ",
"metadata": {
"model": "deepseek-chat",
"tokens_per_second": 45.23,
"world_id": "fantasy"
}
}
}
Chat End
{
"event": "chat_end",
"payload": {
"content": "",
"metadata": {"total_tokens": 342}
}
}
Example Usage
Python WebSocket Client
import asyncio
import websockets
import json
async def chat():
uri = "ws://localhost:8000/ws/rpc"
async with websockets.connect(uri) as ws:
# Receive ready message
msg = await ws.recv()
print(msg)
# Select world
await ws.send(json.dumps({"prompt": "/world select fantasy"}))
# Select character
await ws.send(json.dumps({"prompt": "/character select elara"}))
# Start session
await ws.send(json.dumps({"prompt": "/session new"}))
# Send chat message
await ws.send(json.dumps({"prompt": "Hello! Tell me about this world."}))
# Receive streaming response
while True:
response = await ws.recv()
data = json.loads(response)
if data["event"] == "chat_chunk":
print(data["payload"]["content"], end="")
elif data["event"] == "chat_end":
print(f"\n\nTotal tokens: {data['payload']['metadata']['total_tokens']}")
break
asyncio.run(chat())
JavaScript WebSocket Client
const ws = new WebSocket('ws://localhost:8000/ws/rpc');
ws.onopen = () => {
console.log('Connected');
// Setup world and character
ws.send(JSON.stringify({prompt: '/world select fantasy'}));
ws.send(JSON.stringify({prompt: '/character select elara'}));
ws.send(JSON.stringify({prompt: '/session new'}));
// Send message
ws.send(JSON.stringify({prompt: 'Hello!'}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.event === 'chat_chunk') {
process.stdout.write(data.payload.content);
} else if (data.event === 'chat_end') {
console.log(`\n\nTokens: ${data.payload.metadata.total_tokens}`);
}
};
State Management
The WebSocket endpoint uses global state variables from engine.state:
ACTIVE_WORLD_ID: Currently selected world
ACTIVE_CHARACTER_ID: Currently selected character
ACTIVE_SESSION_ID: Current conversation session
CURRENT_MODEL: AI model being used
ACTIVE_RULES: List of active rule IDs (e.g., ["nsfw"])
AVAILABLE_MODELS: List of all fetched AI models
Error Handling
Missing Configuration
if (
not state.ACTIVE_WORLD_ID
or not state.ACTIVE_CHARACTER_ID
or not state.ACTIVE_SESSION_ID
):
await websocket.send_text(
build_ws_payload(
"system_update",
"✗ Prepare the stage first (World, Char, Session).",
)
)
Task Cancellation
try:
await task
except asyncio.CancelledError:
log.info("Task was successfully cancelled.")
except Exception as e:
log.error(f"Task failed with error: {e}")
WebSocket Disconnection
except WebSocketDisconnect:
log.info("Disconnected.")