Skip to main content

Overview

The LangShazam backend is built with FastAPI, leveraging Python’s async capabilities for high-performance real-time audio processing. The architecture is modular, with clear separation of concerns across different components.

Application Structure

Main Application Entry Point

The application is initialized in backend/src/main.py:24-39 with FastAPI and CORS middleware:
app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=CORS_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize components
metrics = ServerMetrics()
audio_processor = AudioProcessor(api_key=os.getenv("OPENAI_API_KEY"))
ws_manager = WebSocketManager(audio_processor, metrics)
The application uses dependency injection pattern, passing shared instances of AudioProcessor and ServerMetrics to the WebSocketManager.

Core Components

AudioProcessor

Location: backend/src/audio_processor.py

Purpose

Handles all audio processing and OpenAI API interactions with rate limiting and connection tracing.

Key Features

The AudioProcessor implements concurrent call limiting using asyncio semaphores:
def __init__(self, api_key: str, max_concurrent_calls: int = 3):
    self.client = OpenAI(api_key=api_key.strip())
    self.api_semaphore = asyncio.Semaphore(max_concurrent_calls)
This prevents overloading the OpenAI API and ensures fair resource distribution across connections.Reference: audio_processor.py:15-17
Audio is sent to OpenAI’s Whisper-1 model for transcription:
async def call_openai_api(self, audio_data: bytes, connection_id: str):
    async with self.api_semaphore:
        audio_file = io.BytesIO(audio_data)
        audio_file.name = "audio.mp4"
        
        response = await asyncio.to_thread(
            self.client.audio.transcriptions.create,
            model="whisper-1",
            file=audio_file,
            response_format="verbose_json"
        )
        return response
The method uses asyncio.to_thread() to prevent blocking the event loop.Reference: audio_processor.py:19-42
Every request includes a unique connection ID for tracking:
logger.info(f"[{connection_id}] Starting audio processing")
logger.info(f"[{connection_id}] OpenAI API call completed in {api_time:.2f}s")
This enables debugging and performance analysis for individual requests.Reference: audio_processor.py:53, 41

Processing Flow

WebSocketManager

Location: backend/src/websocket_manager.py

Purpose

Manages WebSocket connections, coordinates audio buffering, and handles the request-response lifecycle.

Connection Lifecycle

1

Connection Establishment

When a client connects, a unique 8-character connection ID is generated:
connection_id = str(uuid.uuid4())[:8]
await websocket.accept()
self.metrics.active_connections += 1
Reference: websocket_manager.py:19-22
2

Audio Buffering

Incoming audio chunks are buffered until minimum size is reached:
buffer = []
total_size = 0
MIN_AUDIO_SIZE = 20000  # 20KB minimum

while True:
    data = await websocket.receive_bytes()
    buffer.append(data)
    total_size += len(data)
    
    if total_size >= MIN_AUDIO_SIZE:
        audio_data = b''.join(buffer)
        result = await self.audio_processor.process_audio(...)
Reference: websocket_manager.py:29-49
3

Processing & Response

Once enough audio is buffered, it’s processed and results are sent:
await websocket.send_json({
    "status": "success",
    "data": result,
    "timestamp": datetime.now().isoformat(),
    "connection_id": connection_id
})
Reference: websocket_manager.py:65-70
4

Connection Cleanup

Connections are properly closed and metrics updated:
finally:
    self.metrics.active_connections -= 1
    self.metrics.log_current_metrics(connection_id)
Reference: websocket_manager.py:90-92

Error Handling

The WebSocketManager implements comprehensive error handling for various failure scenarios.
try:
    # Main processing loop
except WebSocketDisconnect:
    logger.info(f"[{connection_id}] Connection disconnected")
except Exception as e:
    self.metrics.errors += 1
    logger.error(f"[{connection_id}] WebSocket error: {e}")
    await websocket.send_json({
        "status": "error",
        "message": str(e),
        "timestamp": datetime.now().isoformat(),
        "connection_id": connection_id
    })
Reference: websocket_manager.py:78-89

ServerMetrics

Location: backend/src/metrics.py

Purpose

Collects and reports server performance metrics including connections, processing times, CPU, and memory usage.

Tracked Metrics

class ServerMetrics:
    def __init__(self):
        self.active_connections = 0
        self.processing_times = deque(maxlen=100)  # Rolling window
        self.active_processes = 0
        self.total_requests = 0
        self.errors = 0
        self.cpu_cores = multiprocessing.cpu_count()
Reference: metrics.py:15-21

Metric Categories

Connection Metrics

  • Active connections
  • Total requests
  • Error count

Performance Metrics

  • Average processing time
  • Per-request timing (last 100)

System Metrics

  • Memory usage (MB)
  • CPU usage (total & per-core)
  • Effective cores utilized

Process Metrics

  • Active processes
  • CPU core count

Metrics Logging

Metrics are logged with connection context:
def log_current_metrics(self, connection_id: str = None):
    prefix = f"[{connection_id}]" if connection_id else ""
    logger.info(
        f"\n=== Server Metrics {prefix} ===\n"
        f"Active Connections: {self.active_connections}\n"
        f"Total Requests: {self.total_requests}\n"
        f"Avg Processing Time: {avg_processing_time:.2f}s\n"
        f"Memory Usage: {memory_usage:.2f}MB\n"
        f"Total CPU Usage: {cpu_percent}%\n"
        f"Effective Cores Used: {cpu_percent/100 * self.cpu_cores:.1f} of {self.cpu_cores}\n"
    )
Reference: metrics.py:23-44

Request Flow & Lifecycle

Configuration System

Location: backend/src/config/settings.py The backend uses a centralized configuration system with environment variable support:
SERVER_CONFIG = {
    "host": "0.0.0.0",
    "port": int(os.getenv("PORT", "10000")),
    "debug": os.getenv("DEBUG", "false").lower() == "true"
}

Error Handling and Logging

Logging Strategy

Structured Logging

The backend implements structured logging with connection tracing throughout the request lifecycle.
Configuration (main.py:16-22):
logging.basicConfig(
    level=getattr(logging, LOGGING_CONFIG["level"]),
    format=LOGGING_CONFIG["format"],
    datefmt=LOGGING_CONFIG["datefmt"]
)
logger = logging.getLogger(__name__)

Error Categories

Handled in websocket_manager.py:78-89:
  • WebSocketDisconnect: Clean client disconnection
  • Processing Errors: Errors during audio processing
  • General Exceptions: Unexpected errors with full error reporting
Handled in audio_processor.py:43-45:
  • OpenAI API failures
  • Network timeouts
  • Rate limit exceeded
All errors are logged with connection ID and increment the error counter.

Memory Management

The AudioProcessor explicitly calls garbage collection after processing to manage memory:
finally:
    gc.collect()
Reference: audio_processor.py:72

API Endpoints

WebSocket Endpoint

/ws

Protocol: WebSocket
Purpose: Real-time language detection
Handler: websocket_endpoint
Implementation (main.py:51-57):
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    connection_start = time.time()
    await ws_manager.handle_connection(websocket)
    connection_duration = time.time() - connection_start
    logger.info(f"WebSocket connection duration: {connection_duration:.2f}s")

REST Endpoints

GET /

Health check endpoint
@app.get("/")
async def home():
    return {"message": "Server is running!"}
Reference: main.py:41-44

GET /metrics

Server metrics endpoint
@app.get("/metrics")
async def get_metrics():
    return metrics.get_metrics_dict()
Reference: main.py:46-49

Performance Optimizations

Async Processing

All I/O operations use async/await for non-blocking execution

Semaphore Rate Limiting

Prevents API overload with max 3 concurrent OpenAI calls

Audio Buffering

Waits for minimum 20KB before processing to reduce API calls

Memory Management

Explicit garbage collection after audio processing

Connection Pooling

Reuses HTTP connections through OpenAI SDK

Metrics Deque

Rolling window of last 100 processing times for efficiency

Architecture Overview

High-level system architecture and component interaction

Frontend Architecture

React frontend implementation details

Build docs developers (and LLMs) love