Skip to main content

Overview

The main entry point for the Flower Engine backend. Initializes the FastAPI app, loads assets from YAML files, fetches available AI models, and handles WebSocket connections for real-time chat.

FastAPI Application

app

app = FastAPI(title="The Flower Engine")
The main FastAPI application instance.

Startup Event

startup()

@app.on_event("startup")
async def startup()
Executed when the FastAPI application starts. Performs initialization tasks:
  1. Load YAML Assets: Syncs worlds and characters from assets/worlds/*.yaml and assets/characters/*.yaml to the database
  2. Process World Lore: Splits large lore text into 800-character chunks for RAG vector storage
  3. Fetch AI Models: Retrieves available models from OpenRouter, Groq, and Gemini APIs
  4. Add Static Models: Includes DeepSeek Chat and DeepSeek Reasoner models

World Loading

for data in load_yaml_assets("assets/worlds/*.yaml"):
    w = World(
        id=data["id"],
        name=data["name"],
        lore=data.get("lore", ""),
        start_message=data.get("start_message", ""),
        scene=data.get("scene", ""),
        system_prompt=data.get("system_prompt", ""),
    )
    world_manager.add_world(w)
data.id
string
required
Unique identifier for the world
data.name
string
required
Display name of the world
data.lore
string
Background lore and world-building text (automatically chunked for RAG)
data.start_message
string
Opening message shown when a new session begins
data.scene
string
Current scene description (injected on first message only)
data.system_prompt
string
World-specific system instructions for the AI

Character Loading

for data in load_yaml_assets("assets/characters/*.yaml"):
    c = Character(
        id=data["id"],
        name=data["name"],
        persona=data.get("persona", "")
    )
    char_manager.add_character(c)
data.id
string
required
Unique identifier for the character
data.name
string
required
Character’s display name
data.persona
string
Character personality and behavior description

Model Fetching

The startup event fetches models from multiple providers: OpenRouter Models
resp = await hc.get("https://openrouter.ai/api/v1/models", headers=headers)
state.AVAILABLE_MODELS.append({
    "id": m["id"],
    "name": f"[OpenRouter] {m.get('name', m['id'])}",
    "prompt_price": round(float(p.get("prompt", 0)) * 1e6, 4),
    "completion_price": round(float(p.get("completion", 0)) * 1e6, 4),
})
Groq Models
resp = await hc.get(f"{GROQ_BASE_URL}/models", headers=headers)
Gemini Models
gemini_list = [
    {"id": "gemini/gemini-3.1-pro-preview", "name": "[Gemini] Gemini 3.1 Pro"},
    {"id": "gemini/gemini-3-flash-preview", "name": "[Gemini] Gemini 3 Flash"},
    {"id": "gemini/gemini-3.1-flash-lite-preview", "name": "[Gemini] Gemini 3.1 Flash-Lite"},
]
DeepSeek Models
state.AVAILABLE_MODELS.append({
    "id": "deepseek-chat",
    "name": "[DeepSeek] DeepSeek Chat",
    "prompt_price": 0.14,
    "completion_price": 0.28,
})

WebSocket Endpoint

websocket_endpoint(websocket: WebSocket)

@app.websocket("/ws/rpc")
async def websocket_endpoint(websocket: WebSocket)
Handles WebSocket connections for real-time chat and commands.
websocket
WebSocket
required
FastAPI WebSocket connection object

Connection Flow

  1. Accept Connection: Accepts the WebSocket and sends a ready message
  2. Broadcast State: Sends current engine state (worlds, characters, sessions)
  3. Message Loop: Continuously receives and processes messages
await websocket.accept()
await websocket.send_text(
    build_ws_payload("system_update", "✓ Engine ready.", {"status": "ok"})
)
await broadcast_sync_state(websocket)

Message Handling

Command Messages (start with /)
if prompt.startswith("/"):
    await handle_command(prompt, websocket)
    continue
Chat Messages Requires active world, character, and session:
if (
    not state.ACTIVE_WORLD_ID
    or not state.ACTIVE_CHARACTER_ID
    or not state.ACTIVE_SESSION_ID
):
    await websocket.send_text(
        build_ws_payload(
            "system_update",
            "✗ Prepare the stage first (World, Char, Session).",
        )
    )
    continue

RAG Context Retrieval

# Retrieve lore chunks for current world
lore_list, _ = rag_manager.query_lore(
    state.ACTIVE_WORLD_ID, prompt, n_results=2
)

# Retrieve session memory
mem_key = f"{state.ACTIVE_CHARACTER_ID}_{state.ACTIVE_SESSION_ID}"
mem_list, _ = rag_manager.query_memory(mem_key, prompt, n_results=3)
lore_list
List[str]
Up to 2 most relevant lore chunks from the current world
mem_list
List[str]
Up to 3 most relevant conversation exchanges from session history

Message Saving

# Save user message BEFORE generating response
msg_manager.add_message(
    Message(
        role="user",
        content=prompt,
        character_id=state.ACTIVE_CHARACTER_ID,
        session_id=state.ACTIVE_SESSION_ID,
    )
)

Streaming Response

task = asyncio.create_task(
    stream_chat_response(
        websocket,
        prompt,
        full_context,
        state.ACTIVE_WORLD_ID,
        state.ACTIVE_CHARACTER_ID,
        state.ACTIVE_SESSION_ID,
    )
)
The streaming task runs in the background while the main loop checks for cancellation:
while not task.done():
    try:
        raw = await asyncio.wait_for(websocket.receive_text(), timeout=0.05)
        cmd_msg = json.loads(raw)
        if cmd_msg.get("prompt") == "/cancel":
            task.cancel()
            await websocket.send_text(
                build_ws_payload("system_update", "✗ Stream cancelled by user.")
            )
    except asyncio.TimeoutError:
        continue

Disconnection Handling

except WebSocketDisconnect:
    log.info("Disconnected.")

Running the Server

Development Mode

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("engine.main:app", host="0.0.0.0", port=8000, reload=True)
Starts the server on http://0.0.0.0:8000 with auto-reload enabled.

Production Mode

uvicorn engine.main:app --host 0.0.0.0 --port 8000

WebSocket Message Format

Incoming Messages

{
  "prompt": "/world select fantasy"
}
Or plain text:
Hello, how are you?

Outgoing Messages

System Update
{
  "event": "system_update",
  "payload": {
    "content": "✓ Engine ready.",
    "metadata": {"status": "ok"}
  }
}
Chat Chunk
{
  "event": "chat_chunk",
  "payload": {
    "content": "Hello! ",
    "metadata": {
      "model": "deepseek-chat",
      "tokens_per_second": 45.23,
      "world_id": "fantasy"
    }
  }
}
Chat End
{
  "event": "chat_end",
  "payload": {
    "content": "",
    "metadata": {"total_tokens": 342}
  }
}

Example Usage

Python WebSocket Client

import asyncio
import websockets
import json

async def chat():
    uri = "ws://localhost:8000/ws/rpc"
    async with websockets.connect(uri) as ws:
        # Receive ready message
        msg = await ws.recv()
        print(msg)
        
        # Select world
        await ws.send(json.dumps({"prompt": "/world select fantasy"}))
        
        # Select character
        await ws.send(json.dumps({"prompt": "/character select elara"}))
        
        # Start session
        await ws.send(json.dumps({"prompt": "/session new"}))
        
        # Send chat message
        await ws.send(json.dumps({"prompt": "Hello! Tell me about this world."}))
        
        # Receive streaming response
        while True:
            response = await ws.recv()
            data = json.loads(response)
            if data["event"] == "chat_chunk":
                print(data["payload"]["content"], end="")
            elif data["event"] == "chat_end":
                print(f"\n\nTotal tokens: {data['payload']['metadata']['total_tokens']}")
                break

asyncio.run(chat())

JavaScript WebSocket Client

const ws = new WebSocket('ws://localhost:8000/ws/rpc');

ws.onopen = () => {
  console.log('Connected');
  
  // Setup world and character
  ws.send(JSON.stringify({prompt: '/world select fantasy'}));
  ws.send(JSON.stringify({prompt: '/character select elara'}));
  ws.send(JSON.stringify({prompt: '/session new'}));
  
  // Send message
  ws.send(JSON.stringify({prompt: 'Hello!'}));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  if (data.event === 'chat_chunk') {
    process.stdout.write(data.payload.content);
  } else if (data.event === 'chat_end') {
    console.log(`\n\nTokens: ${data.payload.metadata.total_tokens}`);
  }
};

State Management

The WebSocket endpoint uses global state variables from engine.state:
  • ACTIVE_WORLD_ID: Currently selected world
  • ACTIVE_CHARACTER_ID: Currently selected character
  • ACTIVE_SESSION_ID: Current conversation session
  • CURRENT_MODEL: AI model being used
  • ACTIVE_RULES: List of active rule IDs (e.g., ["nsfw"])
  • AVAILABLE_MODELS: List of all fetched AI models

Error Handling

Missing Configuration
if (
    not state.ACTIVE_WORLD_ID
    or not state.ACTIVE_CHARACTER_ID
    or not state.ACTIVE_SESSION_ID
):
    await websocket.send_text(
        build_ws_payload(
            "system_update",
            "✗ Prepare the stage first (World, Char, Session).",
        )
    )
Task Cancellation
try:
    await task
except asyncio.CancelledError:
    log.info("Task was successfully cancelled.")
except Exception as e:
    log.error(f"Task failed with error: {e}")
WebSocket Disconnection
except WebSocketDisconnect:
    log.info("Disconnected.")

Build docs developers (and LLMs) love