Skip to main content

Real-time Communication

LatentGEO uses Server-Sent Events (SSE) for real-time audit progress updates, providing a better user experience than traditional polling while maintaining simplicity and reliability.

Why SSE?

Advantages

  • Simpler than WebSockets
  • Built-in reconnection
  • HTTP-compatible (works through proxies)
  • One-way server-to-client (perfect for progress updates)
  • Native browser support

Use Cases

  • Audit progress updates
  • Real-time score changes
  • Status notifications
  • Live dashboards

SSE vs Webhooks

LatentGEO uses both SSE and webhooks for different purposes:
FeatureSSEWebhooks
PurposeReal-time UI updatesExternal integrations
DirectionServer → Client (browser)Server → Server
Use CaseDashboard progressGitHub, HubSpot, etc.
Endpoint/api/v1/sse/audits/{id}/progress/api/v1/webhooks/*
These are complementary, not exclusive:
  • SSE solves real-time UX for audit progress
  • Webhooks solve external automation and integrations

Architecture Overview

┌──────────────────────────────────────────────────────────┐
│                    FRONTEND (Browser)                     │
│  ┌────────────────────────────────────────────────────┐  │
│  │         useAuditSSE Hook                           │  │
│  │  • EventSource connection                          │  │
│  │  • Automatic reconnection                          │  │
│  │  • Fallback to polling                             │  │
│  └────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────┘
                            ↓ SSE
            (via Next.js SSE Proxy: /api/sse/audits/:id)

┌──────────────────────────────────────────────────────────┐
│                    NEXT.JS SERVER                         │
│  ┌────────────────────────────────────────────────────┐  │
│  │    API Route: /api/sse/audits/[id]/progress       │  │
│  │  • Authenticates request                           │  │
│  │  • Proxies SSE to backend                          │  │
│  │  • Adds Authorization header                       │  │
│  └────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────┘
                            ↓ HTTP SSE
┌──────────────────────────────────────────────────────────┐
│                    FASTAPI BACKEND                        │
│  ┌────────────────────────────────────────────────────┐  │
│  │    SSE Endpoint: /api/v1/sse/audits/{id}/progress │  │
│  │  • Validates auth & ownership                      │  │
│  │  • Subscribes to Redis channel                     │  │
│  │  • Streams updates via EventSourceResponse         │  │
│  │  • Fallback to DB polling                          │  │
│  └────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────┐
│                     REDIS PUB/SUB                         │
│  Channel: audit:progress:{audit_id}                      │
│  • Worker publishes progress updates                     │
│  • SSE endpoint subscribes to channel                    │
│  • Real-time event delivery                              │
└──────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────┐
│                    CELERY WORKER                          │
│  • Processes audit                                        │
│  • Publishes progress to Redis                           │
│  • Updates database                                       │
└──────────────────────────────────────────────────────────┘

Backend SSE Implementation

SSE Endpoint

The SSE endpoint streams real-time updates using FastAPI’s EventSourceResponse:
backend/app/api/routes/sse.py
from fastapi import APIRouter, Depends, Request
from fastapi.sse import EventSourceResponse, ServerSentEvent
from app.core.auth import get_current_user, AuthUser
from app.core.access_control import ensure_audit_access

router = APIRouter(prefix="/sse", tags=["sse"])

@router.get("/audits/{audit_id}/progress")
async def stream_audit_progress(
    request: Request,
    audit_id: int,
    current_user: AuthUser = Depends(get_current_user),
):
    """
    SSE endpoint for streaming audit progress updates.
    Requires Authorization: Bearer header.
    """
    # Validate ownership
    initial_payload = await run_in_threadpool(
        _load_owned_audit_payload, audit_id, current_user
    )
    
    return EventSourceResponse(
        audit_progress_stream(audit_id, current_user, request, initial_payload),
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

Redis-First Streaming Strategy

The streaming logic uses a Redis-first approach with DB fallback:
backend/app/api/routes/sse.py
async def audit_progress_stream(
    audit_id: int,
    current_user: AuthUser,
    request: Request,
    initial_payload: dict | None = None,
) -> AsyncGenerator[bytes, None]:
    """Stream audit progress using Redis as primary source and DB as fallback."""
    
    # Configuration
    heartbeat_seconds = settings.SSE_HEARTBEAT_SECONDS  # 30s default
    fallback_db_interval = settings.SSE_FALLBACK_DB_INTERVAL_SECONDS  # 10s
    retry_ms = settings.SSE_RETRY_MS  # 5000ms
    sse_source = settings.SSE_SOURCE  # "redis" or "db"
    
    # Redis setup
    use_redis_source = (
        sse_source == "redis" and 
        cache.enabled and 
        bool(cache.redis_client)
    )
    
    redis_channel = AuditService.progress_channel(audit_id)
    # Channel format: "audit:progress:{audit_id}"
    
    pubsub = None
    
    try:
        # 1. Send initial state immediately
        initial = initial_payload or await run_in_threadpool(
            _load_owned_audit_payload, audit_id, current_user
        )
        yield _serialize_sse_event(
            ServerSentEvent(raw_data=json.dumps(initial), retry=retry_ms)
        )
        
        # 2. Check if already terminal
        if initial["status"] in {"completed", "failed"}:
            return
        
        # 3. Subscribe to Redis channel
        if use_redis_source:
            try:
                pubsub = cache.redis_client.pubsub()
                await run_in_threadpool(pubsub.subscribe, redis_channel)
                logger.info(f"SSE Redis subscribed: {redis_channel}")
            except Exception as e:
                logger.warning(f"Redis subscription failed: {e}")
                pubsub = None
                use_redis_source = False
        
        # 4. Event loop: Redis + DB fallback + heartbeat
        while True:
            # Check timeout and disconnection
            if await request.is_disconnected():
                break
            
            payload = None
            
            # Try Redis message
            if pubsub:
                try:
                    message = await run_in_threadpool(
                        lambda: pubsub.get_message(
                            ignore_subscribe_messages=True,
                            timeout=1.0
                        )
                    )
                    if message and message["type"] == "message":
                        payload = _decode_redis_payload(
                            message["data"], audit_id
                        )
                except Exception as e:
                    logger.warning(f"Redis read failed: {e}")
                    pubsub = None
                    use_redis_source = False
            
            # Fallback to DB if needed
            should_check_db = (
                payload is None and
                (sse_source == "db" or 
                 not use_redis_source or
                 time_since_last_db_check >= fallback_db_interval)
            )
            
            if should_check_db:
                payload = await run_in_threadpool(
                    _load_owned_audit_payload, audit_id, current_user
                )
            
            # Emit payload if changed
            if payload is not None:
                if payload_changed(payload):
                    yield _serialize_sse_event(
                        ServerSentEvent(
                            raw_data=json.dumps(payload),
                            retry=retry_ms
                        )
                    )
                
                # Check terminal status
                if payload["status"] in {"completed", "failed"}:
                    break
            
            # Send heartbeat if no activity
            elif time_since_last_emit >= heartbeat_seconds:
                yield _serialize_sse_event(
                    ServerSentEvent(comment="heartbeat", retry=retry_ms)
                )
            
            # Sleep if no Redis subscription
            if not pubsub:
                await asyncio.sleep(0.25)
    
    finally:
        # Cleanup Redis subscription
        if pubsub:
            await run_in_threadpool(pubsub.unsubscribe, redis_channel)
            await run_in_threadpool(pubsub.close)

Configuration Variables

# SSE Configuration
SSE_SOURCE=redis                      # "redis" or "db"
SSE_FALLBACK_DB_INTERVAL_SECONDS=10   # DB polling interval when Redis unavailable
SSE_HEARTBEAT_SECONDS=30              # Keep-alive interval
SSE_RETRY_MS=5000                     # Client reconnection interval
SSE_MAX_DURATION=3600                 # Max stream duration (1 hour)

Publishing Progress from Workers

backend/app/services/audit_service.py
class AuditService:
    @staticmethod
    def progress_channel(audit_id: int) -> str:
        """Get Redis channel name for audit progress."""
        return f"audit:progress:{audit_id}"
    
    @staticmethod
    def publish_progress(audit: Audit):
        """Publish progress update to Redis."""
        from app.services.cache_service import cache
        
        if not cache.enabled:
            return
        
        channel = AuditService.progress_channel(audit.id)
        payload = {
            "audit_id": audit.id,
            "status": audit.status,
            "progress": audit.progress,
            "geo_score": audit.geo_score,
            "total_pages": audit.total_pages,
            "error_message": audit.error_message,
        }
        
        try:
            cache.redis_client.publish(
                channel,
                json.dumps(payload, default=str)
            )
            logger.debug(f"Published progress to {channel}")
        except Exception as e:
            logger.error(f"Failed to publish progress: {e}")

# Usage in Celery task
@celery_app.task
def process_audit(audit_id: int):
    db = SessionLocal()
    try:
        audit = AuditService.get_audit(db, audit_id)
        
        # Update progress
        audit.status = "running"
        audit.progress = 25
        db.commit()
        
        # Publish to Redis for SSE clients
        AuditService.publish_progress(audit)
        
        # ... continue processing
    finally:
        db.close()

Next.js SSE Proxy

The frontend uses a Next.js API route to proxy SSE connections and add authentication:
app/api/sse/audits/[id]/progress/route.ts
import { getAccessToken } from "@auth0/nextjs-auth0";
import { NextRequest } from "next/server";

const BACKEND_URL = process.env.API_URL || "http://backend:8000";

export async function GET(
  request: NextRequest,
  { params }: { params: { id: string } }
) {
  const auditId = params.id;
  
  // Get access token
  const { accessToken } = await getAccessToken();
  if (!accessToken) {
    return new Response("Unauthorized", { status: 401 });
  }
  
  // Proxy SSE connection to backend
  const backendUrl = `${BACKEND_URL}/api/v1/sse/audits/${auditId}/progress`;
  const response = await fetch(backendUrl, {
    headers: {
      Authorization: `Bearer ${accessToken}`,
    },
  });
  
  if (!response.ok) {
    return new Response(response.statusText, { status: response.status });
  }
  
  // Stream response to client
  return new Response(response.body, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive",
    },
  });
}
Benefits:
  • Hides backend URL from client
  • Centralized authentication
  • Handles token refresh
  • Simplifies CORS

Frontend SSE Hook

The useAuditSSE hook manages SSE connections with automatic fallback:
hooks/useAuditSSE.ts
import { useEffect, useState, useRef, useCallback } from "react";
import logger from "@/lib/logger";

interface AuditProgress {
  audit_id: number;
  progress: number;
  status: string;
  error_message?: string;
  geo_score?: number;
  total_pages?: number;
}

interface UseAuditSSEOptions {
  onMessage?: (data: AuditProgress) => void;
  onComplete?: (data: AuditProgress) => void;
  onError?: (error: Error) => void;
  enabled?: boolean;
}

export function useAuditSSE(
  auditId: string | number | undefined,
  options: UseAuditSSEOptions = {}
) {
  const [isConnected, setIsConnected] = useState(false);
  const [lastMessage, setLastMessage] = useState<AuditProgress | null>(null);
  const [error, setError] = useState<Error | null>(null);
  const [useFallback, setUseFallback] = useState(false);
  
  const eventSourceRef = useRef<EventSource | null>(null);
  const reconnectAttemptsRef = useRef(0);
  const maxReconnectAttempts = 3;
  
  const enabled = options.enabled ?? true;
  
  // Cleanup function
  const cleanup = useCallback((intentional = true) => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      eventSourceRef.current = null;
    }
    setIsConnected(false);
  }, []);
  
  // Fallback: Traditional polling
  const startPolling = useCallback(() => {
    if (!auditId || !enabled) return;
    
    logger.log("[Fallback] Using polling instead of SSE");
    setUseFallback(true);
    
    const poll = async () => {
      try {
        const res = await fetch(
          `/api/v1/audits/${auditId}/status`
        );
        
        if (!res.ok) {
          if (res.status === 401 || res.status === 403) {
            cleanup(true);
          }
          return;
        }
        
        const data: AuditProgress = await res.json();
        setLastMessage(data);
        
        if (options.onMessage) {
          options.onMessage(data);
        }
        
        // Check terminal status
        if (data.status === "completed" || data.status === "failed") {
          if (options.onComplete) {
            options.onComplete(data);
          }
          cleanup(true);
        }
      } catch (err) {
        console.error("[Fallback] Polling error:", err);
      }
    };
    
    // Poll every 3 seconds
    poll();
    const interval = setInterval(poll, 3000);
    return () => clearInterval(interval);
  }, [auditId, cleanup, enabled, options]);
  
  // Connect to SSE
  const connect = useCallback(async () => {
    if (!auditId || !enabled) return;
    
    cleanup(false);
    
    const sseUrl = `/api/sse/audits/${auditId}/progress`;
    logger.log(`[SSE] Connecting to proxy: ${sseUrl}`);
    
    try {
      const eventSource = new EventSource(sseUrl);
      eventSourceRef.current = eventSource;
      
      eventSource.onopen = () => {
        logger.log("[SSE] Connection established");
        setIsConnected(true);
        setError(null);
        setUseFallback(false);
        reconnectAttemptsRef.current = 0;
      };
      
      eventSource.onmessage = (event) => {
        try {
          const data: AuditProgress = JSON.parse(event.data);
          logger.log("[SSE] Message received:", data);
          
          setLastMessage(data);
          
          if (options.onMessage) {
            options.onMessage(data);
          }
          
          // Check terminal status
          if (data.status === "completed" || data.status === "failed") {
            logger.log(`[SSE] Audit ${data.status}, closing connection`);
            if (options.onComplete) {
              options.onComplete(data);
            }
            cleanup(true);
          }
        } catch (err) {
          console.error("[SSE] Failed to parse message:", err);
        }
      };
      
      eventSource.onerror = (err) => {
        console.error("[SSE] Connection error:", err);
        setIsConnected(false);
        
        const errorObj = new Error("SSE connection error");
        setError(errorObj);
        
        if (options.onError) {
          options.onError(errorObj);
        }
        
        // Try reconnecting, then fallback to polling
        if (reconnectAttemptsRef.current < maxReconnectAttempts) {
          const delay = Math.min(
            1000 * Math.pow(2, reconnectAttemptsRef.current),
            10000
          );
          logger.log(
            `[SSE] Reconnecting in ${delay}ms ` +
            `(attempt ${reconnectAttemptsRef.current + 1}/${maxReconnectAttempts})`
          );
          
          setTimeout(() => {
            reconnectAttemptsRef.current++;
            connect();
          }, delay);
        } else {
          console.warn(
            "[SSE] Max reconnection attempts reached, " +
            "falling back to polling"
          );
          cleanup(false);
          startPolling();
        }
      };
    } catch (err) {
      console.error("[SSE] Failed to create EventSource:", err);
      startPolling();
    }
  }, [auditId, cleanup, startPolling, enabled, options]);
  
  useEffect(() => {
    if (!auditId || !enabled) {
      cleanup(true);
      return;
    }
    
    connect();
    return () => cleanup(true);
  }, [connect, cleanup, auditId, enabled]);
  
  return {
    isConnected,
    lastMessage,
    error,
    useFallback,
    reconnect: connect,
  };
}

Hook Features

Automatic Reconnection

Exponential backoff (1s → 2s → 4s → 8s)

Fallback to Polling

Switches to 3-second polling after 3 failed reconnections

Lifecycle Callbacks

onMessage, onComplete, onError hooks

Connection Status

isConnected, useFallback indicators

Usage Example

import { useAuditSSE } from "@/hooks/useAuditSSE";
import { toast } from "sonner";

function AuditProgressCard({ auditId }: { auditId: number }) {
  const { lastMessage, isConnected, useFallback, error } = useAuditSSE(
    auditId,
    {
      onMessage: (data) => {
        console.log("Progress update:", data);
      },
      onComplete: (data) => {
        toast.success(
          `Audit ${data.status}! GEO Score: ${data.geo_score}`
        );
      },
      onError: (error) => {
        toast.error(error.message);
      },
      enabled: true,
    }
  );
  
  return (
    <Card>
      <CardHeader>
        <div className="flex items-center gap-2">
          <h3>Audit Progress</h3>
          {isConnected && (
            <Badge variant="success">
              <Signal className="w-3 h-3" />
              Live
            </Badge>
          )}
          {useFallback && (
            <Badge variant="warning">Fallback Mode</Badge>
          )}
        </div>
      </CardHeader>
      <CardContent>
        {lastMessage && (
          <>
            <Progress value={lastMessage.progress} />
            <p>Status: {lastMessage.status}</p>
            <p>Progress: {lastMessage.progress}%</p>
            {lastMessage.geo_score && (
              <p>GEO Score: {lastMessage.geo_score}</p>
            )}
          </>
        )}
        {error && <Alert variant="destructive">{error.message}</Alert>}
      </CardContent>
    </Card>
  );
}

Heartbeat & Retry Mechanisms

Server-side Heartbeat

The backend sends periodic heartbeat comments to keep the connection alive:
# Send heartbeat if no activity for 30 seconds
if time_since_last_emit >= heartbeat_seconds:
    yield _serialize_sse_event(
        ServerSentEvent(comment="heartbeat", retry=retry_ms)
    )

Client-side Retry

The SSE spec includes automatic retry with configurable interval:
# Server sends retry interval in milliseconds
ServerSentEvent(raw_data=data, retry=5000)  # 5 seconds
The browser’s EventSource automatically reconnects after network failures using this interval.

Monitoring & Debugging

Server-side Logging

logger.info(f"SSE connection established for audit {audit_id}")
logger.debug(f"Published progress to {channel}")
logger.warning(f"Redis subscription failed: {error}")
logger.error(f"SSE Redis read failed: {error}")

Client-side Logging

logger.log("[SSE] Connection established");
logger.log("[SSE] Message received:", data);
console.warn("[SSE] Max reconnection attempts reached");
console.error("[SSE] Connection error:", err);

Connection Status Indicators

{isConnected && <Badge variant="success">Live</Badge>}
{useFallback && <Badge variant="warning">Fallback Mode</Badge>}
{error && <Alert variant="destructive">{error.message}</Alert>}

Troubleshooting

Validate Redis

docker compose logs -f redis

Validate Backend SSE

docker compose logs -f backend | grep SSE

Test SSE Endpoint

curl -N -H "Authorization: Bearer $TOKEN" \
  http://localhost:8000/api/v1/sse/audits/123/progress

Check Configuration

# Backend .env
SSE_SOURCE=redis
SSE_FALLBACK_DB_INTERVAL_SECONDS=10
SSE_HEARTBEAT_SECONDS=30
SSE_RETRY_MS=5000

Next Steps

Backend Architecture

Learn about the FastAPI backend

Frontend Architecture

Explore the Next.js frontend

Build docs developers (and LLMs) love