Skip to main content

Overview

The AudioProcessor class handles audio processing and OpenAI API interactions with rate limiting and connection tracing. Location: backend/src/audio_processor.py:10-72

Class Definition

class AudioProcessor:
    """
    Handles audio processing and OpenAI API interactions.
    Implements connection tracing for request tracking.
    """

Constructor

def __init__(self, api_key: str, max_concurrent_calls: int = 3)
Initializes the AudioProcessor with OpenAI client and rate limiting.

Parameters

api_key
str
required
OpenAI API key for authentication. The key is automatically stripped of whitespace.
max_concurrent_calls
int
default:3
Maximum number of concurrent OpenAI API calls. Additional requests are queued using an asyncio Semaphore.

Example

import os
from audio_processor import AudioProcessor

processor = AudioProcessor(
    api_key=os.getenv("OPENAI_API_KEY"),
    max_concurrent_calls=3
)

Methods

call_openai_api

async def call_openai_api(self, audio_data: bytes, connection_id: str)
Makes rate-limited calls to OpenAI’s Whisper API with connection tracing. Location: audio_processor.py:19-47

Parameters

audio_data
bytes
required
Raw audio data in MP4 format to be transcribed
connection_id
str
required
8-character unique identifier for request tracing in logs

Returns

Returns an OpenAI transcription response object with:
  • language: Detected language code (ISO 639-1)
  • Additional fields from verbose_json format

Implementation Details

  1. Acquires semaphore slot (logs acquisition)
  2. Wraps audio data in BytesIO buffer with filename "audio.mp4"
  3. Calls OpenAI API using asyncio.to_thread for non-blocking execution
  4. Uses whisper-1 model with verbose_json response format
  5. Logs API call duration
  6. Releases semaphore in finally block

Example

connection_id = "a1b2c3d4"

try:
    response = await processor.call_openai_api(audio_data, connection_id)
    print(f"Detected language: {response.language}")
except Exception as e:
    print(f"API error: {e}")

Error Handling

Raises exceptions from OpenAI API calls. Logs all errors with connection ID for tracing.

process_audio

async def process_audio(self, audio_data: bytes, metrics, connection_id: str)
Processes audio data and updates metrics with connection tracing. Location: audio_processor.py:49-72

Parameters

audio_data
bytes
required
Raw audio data in MP4 format to be processed
metrics
ServerMetrics
required
ServerMetrics instance for tracking processing times and errors
connection_id
str
required
8-character unique identifier for request tracing

Returns

Returns a dictionary with processing results: Success response:
{
    "language": "en",
    "confidence": 0.9,
    "processing_time": 2.34,
    "connection_id": "a1b2c3d4"
}
language
str
ISO 639-1 language code detected by Whisper
confidence
float
Confidence score (currently fixed at 0.9)
processing_time
float
Total processing time in seconds
connection_id
str
Connection identifier for tracing
Error response:
{
    "error": "Error message",
    "connection_id": "a1b2c3d4"
}
error
str
Error message describing what went wrong
connection_id
str
Connection identifier for tracing

Implementation Details

  1. Records start time
  2. Calls call_openai_api() to get transcription
  3. Calculates total processing time
  4. Appends processing time to metrics deque
  5. Returns structured response with language and timing info
  6. On error: increments metrics error counter and returns error dict
  7. Calls gc.collect() in finally block for memory cleanup

Example

from metrics import ServerMetrics

metrics = ServerMetrics()
connection_id = "a1b2c3d4"

result = await processor.process_audio(audio_data, metrics, connection_id)

if "error" in result:
    print(f"Error: {result['error']}")
else:
    print(f"Language: {result['language']}")
    print(f"Processing time: {result['processing_time']}s")

Dependencies

import asyncio
import logging
import time
import io
from openai import OpenAI
import gc

Rate Limiting

The class implements rate limiting using asyncio.Semaphore:
  • Default: Maximum 3 concurrent API calls
  • Additional requests queue until a slot becomes available
  • Prevents overwhelming the OpenAI API
  • Logs semaphore acquisition and release

Connection Tracing

All methods accept a connection_id parameter used for:
  • Logging request flow through the system
  • Correlating logs from different components
  • Debugging specific requests
  • Monitoring individual connection performance
Log format: [connection_id] Message

Build docs developers (and LLMs) love