Skip to main content

Overview

PAS2 is designed for performance, using parallel API calls, efficient progress tracking, and optimized resource management. Understanding these mechanisms helps you tune the system for your specific workload.

Parallel response generation

PAS2 retrieves responses for paraphrased queries in parallel, significantly reducing total processing time.

Thread pool configuration

pas2.py
def get_responses(self, queries: List[str]) -> List[str]:
    """Get responses from Mistral API for each query in parallel"""
    with ThreadPoolExecutor(max_workers=min(len(queries), 5)) as executor:
        future_to_index = {
            executor.submit(self._get_single_response, query, i): i 
            for i, query in enumerate(queries)
        }
        
        responses = [""] * len(queries)
        completed_count = 0
        
        for future in concurrent.futures.as_completed(future_to_index):
            index = future_to_index[future]
            responses[index] = future.result()
            completed_count += 1
        
        return responses

Worker pool sizing

The thread pool is capped at 5 workers:
max_workers=min(len(queries), 5)
The 5-worker limit prevents overwhelming the API with concurrent requests while maximizing parallelism. Most API providers have rate limits that make higher concurrency counterproductive.

Performance impact

With 4 paraphrases (original + 3 paraphrases):
ApproachTotal TimeSpeedup
Sequential~20 seconds1x
Parallel (5 workers)~5 seconds4x
For custom deployments with higher rate limits, adjust max_workers based on your API tier:
max_workers=min(len(queries), 10)  # For higher rate limits

Progress callback optimization

Progress callbacks enable real-time UI updates without blocking the main thread.

Callback design

pas2.py
def __init__(self, mistral_api_key=None, openai_api_key=None, progress_callback=None):
    self.progress_callback = progress_callback
    # ...

def get_responses(self, queries: List[str]) -> List[str]:
    # ...
    for future in concurrent.futures.as_completed(future_to_index):
        index = future_to_index[future]
        responses[index] = future.result()
        
        completed_count += 1
        if self.progress_callback:
            self.progress_callback("responses_progress", 
                                completed_responses=completed_count, 
                                total_responses=len(queries))

Minimizing callback overhead

Callbacks are designed to be lightweight:
  1. No blocking operations - Callbacks update state only
  2. Thread-safe updates - Uses locks for shared state
  3. Conditional execution - Only fires when callback is registered
pas2.py
def update_stage(self, stage, **kwargs):
    """Update the current stage and trigger callback"""
    with self._lock:
        if stage in self.STAGES:
            self.stage = stage
            # ... update state ...
            
            if self._status_callback:
                self._status_callback(self.get_html_status())

Gradio interface optimization

Queue configuration

Gradio’s interface is configured for optimal throughput:
pas2.py
interface.launch(
    show_api=False,
    quiet=True,
    share=False,
    max_threads=10,
    debug=False
)

Key settings

  • max_threads=10 - Allows up to 10 concurrent interface operations
  • show_api=False - Disables API endpoint generation for faster startup
  • quiet=True - Reduces logging overhead in production

Event handler optimization

The submit button uses a two-stage approach:
submit_button.click(
    fn=start_processing,
    inputs=[query_input],
    outputs=[progress_display, results_accordion, feedback_accordion, hidden_results],
    queue=False  # Immediate execution for UI updates
).then(
    fn=process_query_and_display_results,  # Long-running operation
    inputs=[query_input],
    outputs=[progress_display, results_accordion, feedback_accordion, hidden_results]
)
Using queue=False for start_processing ensures immediate UI feedback before the long-running detection starts.

Database performance

Connection management

PAS2 uses a simple connect-execute-close pattern for SQLite:
pas2.py
def save_feedback(self, results, feedback):
    conn = sqlite3.connect(self.db_path)
    cursor = conn.cursor()
    
    # ... execute query ...
    
    conn.commit()
    conn.close()

When to use connection pooling

For high-volume deployments, consider connection pooling:
import sqlite3
from contextlib import contextmanager

class DatabasePool:
    def __init__(self, db_path, pool_size=5):
        self.db_path = db_path
        self.pool = [sqlite3.connect(db_path) for _ in range(pool_size)]
        self.available = self.pool.copy()
        self.lock = threading.Lock()
    
    @contextmanager
    def get_connection(self):
        with self.lock:
            if not self.available:
                # Create new connection if pool exhausted
                conn = sqlite3.connect(self.db_path)
            else:
                conn = self.available.pop()
        
        try:
            yield conn
        finally:
            with self.lock:
                self.available.append(conn)
Connection pooling adds complexity. Only implement it if you’re handling >100 requests per minute.

API rate limit handling

Built-in retry logic

The system includes error handling for API failures:
pas2.py
def _get_single_response(self, query: str, index: int = None) -> str:
    try:
        response = self.mistral_client.chat.complete(
            model=self.mistral_model,
            messages=messages
        )
        return response.choices[0].message.content
    except Exception as e:
        error_msg = f"Error getting response for query '{query}': {e}"
        logger.error(error_msg, exc_info=True)
        return f"Error: Failed to get response for this query."

Implementing exponential backoff

For production deployments, add retry logic:
import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    logger.warning(f"Attempt {attempt+1} failed, retrying in {delay}s")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3)
def _get_single_response(self, query: str, index: int = None) -> str:
    # ... existing code ...

Memory optimization

Response storage

Responses are stored in lists, not accumulated strings:
responses = [""] * len(queries)  # Pre-allocate

for future in concurrent.futures.as_completed(future_to_index):
    index = future_to_index[future]
    responses[index] = future.result()  # Direct assignment
This avoids repeated string concatenation which creates intermediate objects.

Logging optimization

Logging uses lazy evaluation:
logger.info("Received response for %s (%.2f seconds)", query_description, elapsed_time)
The string formatting only occurs if the log level permits the message.
In production, set logging to WARNING or ERROR to reduce overhead:
logging.basicConfig(level=logging.WARNING)

Monitoring performance

Built-in timing

The system tracks execution time for all major operations:
pas2.py
def generate_paraphrases(self, query: str, n_paraphrases: int = 3) -> List[str]:
    start_time = time.time()
    # ... generate paraphrases ...
    elapsed_time = time.time() - start_time
    logger.info("Generated %d paraphrases in %.2f seconds", len(paraphrases), elapsed_time)

Performance metrics

Key metrics to monitor:
  1. Paraphrase generation time - Typically 2-3 seconds
  2. Response retrieval time - 1-2 seconds per response (parallel)
  3. Judgment time - 3-5 seconds
  4. Total detection time - Usually 8-12 seconds

Creating a performance dashboard

Track metrics over time:
import time

class PerformanceMetrics:
    def __init__(self):
        self.metrics = []
    
    def record(self, operation, duration):
        self.metrics.append({
            'timestamp': time.time(),
            'operation': operation,
            'duration': duration
        })
    
    def get_averages(self, window_size=100):
        recent = self.metrics[-window_size:]
        by_operation = {}
        for m in recent:
            if m['operation'] not in by_operation:
                by_operation[m['operation']] = []
            by_operation[m['operation']].append(m['duration'])
        
        return {
            op: sum(durations) / len(durations)
            for op, durations in by_operation.items()
        }

Optimization checklist

Before deploying to production:
  • Set appropriate max_workers based on API rate limits
  • Configure logging level to WARNING or higher
  • Enable database connection pooling if >100 req/min
  • Implement exponential backoff for API retries
  • Monitor and tune max_threads for Gradio
  • Set up persistent storage volume for /data
  • Implement performance metrics collection
  • Configure appropriate timeout values
Performance characteristics vary based on API latency, network conditions, and query complexity. Always profile your specific deployment before optimizing.

Build docs developers (and LLMs) love