Skip to main content

Overview

The Flower Engine backend is built with FastAPI and handles:
  • WebSocket connections and message routing
  • LLM integration and streaming responses
  • SQLite database management
  • RAG (Retrieval-Augmented Generation) via ChromaDB
  • Session and state persistence
  • Asset loading from YAML files

Technology Stack

Core dependencies from requirements.txt:
fastapi>=0.100.0
uvicorn>=0.23.0
websockets>=11.0.3
chromadb>=0.4.10
openai>=1.3.0
pydantic>=2.3.0
sentence-transformers>=2.2.2
pyyaml>=6.0.1
watchdog>=3.0.0
google-genai>=0.1.0

Key Libraries

  • FastAPI: Async web framework
  • Uvicorn: ASGI server
  • Pydantic: Data validation and models
  • ChromaDB: Vector database for RAG
  • OpenAI SDK: LLM API client (supports OpenRouter, DeepSeek, Groq)
  • SentenceTransformers: Local embeddings (all-MiniLM-L6-v2)
  • Google GenAI: Official Gemini SDK

Running the Backend

Development Mode (with auto-reload)

python -m uvicorn engine.main:app --host 0.0.0.0 --port 8000 --reload
The --reload flag automatically restarts the server when code changes are detected.

Production Mode

python -m uvicorn engine.main:app --host 0.0.0.0 --port 8000

Direct Execution

venv/bin/python engine/main.py
The backend is configured to auto-reload in engine/main.py:266:
if __name__ == "__main__":
    import uvicorn
    uvicorn.run("engine.main:app", host="0.0.0.0", port=8000, reload=True)

Code Style Guidelines

Imports

  • Standard library first, then third-party, then local
  • Use explicit relative imports: from engine.logger import log
  • Group imports logically within files
Example:
import json
import asyncio
from typing import Optional

from fastapi import WebSocket
from pydantic import BaseModel

from engine.logger import log
from engine.database import world_manager
from engine.utils import build_ws_payload

Formatting

  • 4 spaces indentation (not tabs)
  • Maximum line length: 120 characters
  • Use f-strings for string formatting
  • Use trailing commas in multi-line structures
Example:
data = {
    "event": "system_update",
    "payload": {
        "content": f"Model set to {model_name}",
        "metadata": {"model": model_name},
    },  # Trailing comma
}

Type Hints

  • Use Pydantic BaseModel for all data models
  • Use type hints on all function signatures
  • Use Optional[T] for nullable types
  • Prefer explicit types over Any
Example:
from typing import Optional, List
from pydantic import BaseModel

class Character(BaseModel):
    id: str
    name: str
    persona: str

def get_character(char_id: str) -> Optional[Character]:
    """Retrieve character by ID."""
    # Implementation
    pass

async def process_messages(
    websocket: WebSocket,
    messages: List[str],
    timeout: Optional[float] = None
) -> None:
    """Process a list of messages."""
    pass

Naming Conventions

  • Modules: snake_case.py
  • Classes: PascalCase
  • Functions/variables: snake_case
  • Constants: UPPER_SNAKE_CASE
  • Private members: prefix with underscore
Example:
# Constants
MAX_RETRIES = 3
DEFAULT_TIMEOUT = 30.0

# Classes
class WorldManager:
    def __init__(self):
        self._worlds = {}  # Private
    
    def add_world(self, world: World) -> None:
        """Add a world to the manager."""
        self._worlds[world.id] = world

# Functions
def build_ws_payload(event: str, content: str, metadata: dict = None) -> str:
    """Build WebSocket message payload."""
    pass

Error Handling

  • Use try/except with specific exception types
  • Log errors via from engine.logger import log
  • Use bare except: only when necessary (never in new code)
  • Propagate errors appropriately or return sensible defaults
Example:
from engine.logger import log

try:
    world = world_manager.get_world(world_id)
    if not world:
        raise ValueError(f"World not found: {world_id}")
except ValueError as e:
    log.error(f"Invalid world ID: {e}")
    await websocket.send_text(
        build_ws_payload("error", str(e))
    )
    return
except Exception as e:
    log.error(f"Unexpected error: {e}")
    raise

Async Patterns

  • Use async def for all WebSocket handlers
  • Use asyncio for concurrent operations
  • Use asyncio.create_task() for fire-and-forget background tasks
  • Handle asyncio.CancelledError explicitly
Example from engine/main.py:222:
# Stream response in background task
task = asyncio.create_task(
    stream_chat_response(
        websocket,
        prompt,
        full_context,
        state.ACTIVE_WORLD_ID,
        state.ACTIVE_CHARACTER_ID,
        state.ACTIVE_SESSION_ID,
    )
)

# Monitor for cancellation
while not task.done():
    try:
        raw = await asyncio.wait_for(websocket.receive_text(), timeout=0.05)
        try:
            cmd_msg = json.loads(raw)
            if cmd_msg.get("prompt") == "/cancel":
                task.cancel()
                await websocket.send_text(
                    build_ws_payload("system_update", "✗ Stream cancelled by user.")
                )
        except:
            pass
    except asyncio.TimeoutError:
        continue

try:
    await task
except asyncio.CancelledError:
    log.info("Task was successfully cancelled.")
except Exception as e:
    log.error(f"Task failed with error: {e}")

Key Components

Main Application (engine/main.py)

The entry point defines the FastAPI app and WebSocket endpoint:
app = FastAPI(title="The Flower Engine")

@app.on_event("startup")
async def startup():
    # Load assets
    for data in load_yaml_assets("assets/worlds/*.yaml"):
        w = World(**data)
        world_manager.add_world(w)
    
    # Fetch available models from APIs
    # ...

@app.websocket("/ws/rpc")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await websocket.send_text(
        build_ws_payload("system_update", "✓ Engine ready.", {"status": "ok"})
    )
    await broadcast_sync_state(websocket)
    
    try:
        while True:
            data = await websocket.receive_text()
            # Process message
    except WebSocketDisconnect:
        log.info("Disconnected.")

Database (engine/database.py)

Uses SQLite with Pydantic models:
import sqlite3
from pydantic import BaseModel
from typing import Optional

class World(BaseModel):
    id: str
    name: str
    lore: str
    start_message: str = ""
    scene: str = ""
    system_prompt: str = ""

DB_NAME = "engine.db"

def init_db():
    with sqlite3.connect(DB_NAME) as conn:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS worlds (
                id TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                lore TEXT NOT NULL,
                start_message TEXT NOT NULL DEFAULT '',
                scene TEXT NOT NULL DEFAULT '',
                system_prompt TEXT NOT NULL DEFAULT ''
            )
        """)
        
        # Migration: Add new columns
        try:
            cursor.execute(
                "ALTER TABLE worlds ADD COLUMN scene TEXT NOT NULL DEFAULT ''"
            )
        except sqlite3.OperationalError:
            pass  # Column already exists
Key Pattern: Use context managers for connections:
with sqlite3.connect(DB_NAME) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM worlds WHERE id = ?", (world_id,))
    row = cursor.fetchone()
Include migration logic for schema changes using try/except with sqlite3.OperationalError.

Commands (engine/commands.py)

Command handlers parse slash commands:
async def handle_command(cmd_str: str, websocket: WebSocket):
    parts = cmd_str.split(" ", 2)
    cmd = parts[0]
    
    if cmd == "/model" and len(parts) >= 2:
        new_model = parts[1]
        if any(m["id"] == new_model for m in state.AVAILABLE_MODELS):
            state.CURRENT_MODEL = new_model
            state.MODEL_CONFIRMED = True
            state.save_state()
            await websocket.send_text(
                build_ws_payload(
                    "system_update",
                    f"Hot-swapped model to {state.CURRENT_MODEL}",
                    {"model": state.CURRENT_MODEL, "model_confirmed": True},
                )
            )
        else:
            await websocket.send_text(
                build_ws_payload(
                    "system_update", f"Error: {new_model} is not recognized."
                )
            )
Pattern: Split command string with parts = cmd_str.split(" ", 2) to get command, subcommand, and arguments.

LLM Streaming (engine/llm.py)

Streams responses from various LLM providers:
from openai import AsyncOpenAI
from engine.config import OPENAI_BASE_URL, OPENAI_API_KEY

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
    default_headers={
        "HTTP-Referer": "https://github.com/ritz541/flower-engine",
        "X-Title": "The Flower Roleplay Engine",
    },
)

async def stream_chat_response(
    ws: WebSocket,
    prompt: str,
    context: str,
    world_id: str,
    char_id: str,
    session_id: str = "",
):
    # Build messages
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": prompt},
    ]
    
    # Stream response
    stream = await client.chat.completions.create(
        model=state.CURRENT_MODEL,
        messages=messages,
        stream=True,
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            await ws.send_text(
                build_ws_payload(
                    "chat_chunk",
                    content,
                    {"tokens_per_second": tps},
                )
            )
    
    await ws.send_text(
        build_ws_payload(
            "chat_end",
            "",
            {"total_tokens": total_tokens},
        )
    )

RAG (engine/rag.py)

Retrieval-Augmented Generation using ChromaDB:
import chromadb
from sentence_transformers import SentenceTransformer

class RAGManager:
    def __init__(self):
        self.client = chromadb.Client()
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2')
    
    def add_lore(self, world_id: str, doc_id: str, text: str):
        """Add lore chunk to vector database."""
        collection = self.client.get_or_create_collection(f"lore_{world_id}")
        embedding = self.embedder.encode(text).tolist()
        collection.add(
            documents=[text],
            embeddings=[embedding],
            ids=[doc_id],
        )
    
    def query_lore(self, world_id: str, query: str, n_results: int = 2):
        """Query relevant lore chunks."""
        collection = self.client.get_collection(f"lore_{world_id}")
        query_embedding = self.embedder.encode(query).tolist()
        results = collection.query(
            query_embeddings=[query_embedding],
            n_results=n_results,
        )
        return results["documents"][0], results["distances"][0]

Utilities (engine/utils.py)

Helper functions for common tasks:
import json
import yaml
import glob

def load_yaml_assets(pattern: str) -> list:
    """Load all YAML files matching pattern."""
    assets = []
    for path in glob.glob(pattern):
        with open(path, 'r') as f:
            data = yaml.safe_load(f)
            assets.append(data)
    return assets

def build_ws_payload(event: str, content: str, metadata: dict = None) -> str:
    """Build WebSocket message payload."""
    return json.dumps({
        "event": event,
        "payload": {
            "content": content,
            "metadata": metadata or {},
        },
    })

Development Tips

Hot Reloading

Always use --reload during development:
python -m uvicorn engine.main:app --reload
Uvicorn watches for file changes and automatically restarts the server.

Logging

Use the centralized logger from engine/logger.py:
from engine.logger import log

log.info("Server started")
log.error(f"Failed to load world: {world_id}")
log.debug(f"Received message: {data}")

Testing WebSocket Connections

See the Testing Guide for using engine/test_client.py to test WebSocket functionality.

Debugging

Add print statements or use the logger:
log.debug(f"Current state: {state.ACTIVE_WORLD_ID}")
log.debug(f"Message payload: {json.dumps(msg, indent=2)}")

State Management

Global state is managed in engine/state.py:
import engine.state as state

# Access state
world_id = state.ACTIVE_WORLD_ID
model = state.CURRENT_MODEL

# Modify state
state.ACTIVE_WORLD_ID = "darkwood"
state.save_state()  # Persist to persist.json

Common Patterns

Sending WebSocket Messages

from engine.utils import build_ws_payload

# System update
await websocket.send_text(
    build_ws_payload("system_update", "World loaded successfully")
)

# With metadata
await websocket.send_text(
    build_ws_payload(
        "system_update",
        "Model changed",
        {"model": "gpt-4", "model_confirmed": True},
    )
)

Database Queries

from engine.database import world_manager, char_manager, msg_manager

# Get world
world = world_manager.get_world("darkwood")

# Add character
char = Character(id="ranger", name="Elara", persona="A skilled tracker")
char_manager.add_character(char)

# Query messages
recent_msgs = msg_manager.get_messages(
    char_id="ranger",
    session_id="session_123",
    limit=10
)

Asset Loading

from engine.utils import load_yaml_assets

# Load all worlds
for data in load_yaml_assets("assets/worlds/*.yaml"):
    world = World(**data)
    world_manager.add_world(world)

# Load all characters
for data in load_yaml_assets("assets/characters/*.yaml"):
    character = Character(**data)
    char_manager.add_character(character)

Next Steps

Build docs developers (and LLMs) love