Real-time Communication
LatentGEO uses Server-Sent Events (SSE) for real-time audit progress updates, providing a better user experience than traditional polling while maintaining simplicity and reliability.Why SSE?
Advantages
- Simpler than WebSockets
- Built-in reconnection
- HTTP-compatible (works through proxies)
- One-way server-to-client (perfect for progress updates)
- Native browser support
Use Cases
- Audit progress updates
- Real-time score changes
- Status notifications
- Live dashboards
SSE vs Webhooks
LatentGEO uses both SSE and webhooks for different purposes:| Feature | SSE | Webhooks |
|---|---|---|
| Purpose | Real-time UI updates | External integrations |
| Direction | Server → Client (browser) | Server → Server |
| Use Case | Dashboard progress | GitHub, HubSpot, etc. |
| Endpoint | /api/v1/sse/audits/{id}/progress | /api/v1/webhooks/* |
- SSE solves real-time UX for audit progress
- Webhooks solve external automation and integrations
Architecture Overview
┌──────────────────────────────────────────────────────────┐
│ FRONTEND (Browser) │
│ ┌────────────────────────────────────────────────────┐ │
│ │ useAuditSSE Hook │ │
│ │ • EventSource connection │ │
│ │ • Automatic reconnection │ │
│ │ • Fallback to polling │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
↓ SSE
(via Next.js SSE Proxy: /api/sse/audits/:id)
↓
┌──────────────────────────────────────────────────────────┐
│ NEXT.JS SERVER │
│ ┌────────────────────────────────────────────────────┐ │
│ │ API Route: /api/sse/audits/[id]/progress │ │
│ │ • Authenticates request │ │
│ │ • Proxies SSE to backend │ │
│ │ • Adds Authorization header │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
↓ HTTP SSE
┌──────────────────────────────────────────────────────────┐
│ FASTAPI BACKEND │
│ ┌────────────────────────────────────────────────────┐ │
│ │ SSE Endpoint: /api/v1/sse/audits/{id}/progress │ │
│ │ • Validates auth & ownership │ │
│ │ • Subscribes to Redis channel │ │
│ │ • Streams updates via EventSourceResponse │ │
│ │ • Fallback to DB polling │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
↓
┌──────────────────────────────────────────────────────────┐
│ REDIS PUB/SUB │
│ Channel: audit:progress:{audit_id} │
│ • Worker publishes progress updates │
│ • SSE endpoint subscribes to channel │
│ • Real-time event delivery │
└──────────────────────────────────────────────────────────┘
↑
┌──────────────────────────────────────────────────────────┐
│ CELERY WORKER │
│ • Processes audit │
│ • Publishes progress to Redis │
│ • Updates database │
└──────────────────────────────────────────────────────────┘
Backend SSE Implementation
SSE Endpoint
The SSE endpoint streams real-time updates using FastAPI’sEventSourceResponse:
backend/app/api/routes/sse.py
from fastapi import APIRouter, Depends, Request
from fastapi.sse import EventSourceResponse, ServerSentEvent
from app.core.auth import get_current_user, AuthUser
from app.core.access_control import ensure_audit_access
router = APIRouter(prefix="/sse", tags=["sse"])
@router.get("/audits/{audit_id}/progress")
async def stream_audit_progress(
request: Request,
audit_id: int,
current_user: AuthUser = Depends(get_current_user),
):
"""
SSE endpoint for streaming audit progress updates.
Requires Authorization: Bearer header.
"""
# Validate ownership
initial_payload = await run_in_threadpool(
_load_owned_audit_payload, audit_id, current_user
)
return EventSourceResponse(
audit_progress_stream(audit_id, current_user, request, initial_payload),
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
Redis-First Streaming Strategy
The streaming logic uses a Redis-first approach with DB fallback:backend/app/api/routes/sse.py
async def audit_progress_stream(
audit_id: int,
current_user: AuthUser,
request: Request,
initial_payload: dict | None = None,
) -> AsyncGenerator[bytes, None]:
"""Stream audit progress using Redis as primary source and DB as fallback."""
# Configuration
heartbeat_seconds = settings.SSE_HEARTBEAT_SECONDS # 30s default
fallback_db_interval = settings.SSE_FALLBACK_DB_INTERVAL_SECONDS # 10s
retry_ms = settings.SSE_RETRY_MS # 5000ms
sse_source = settings.SSE_SOURCE # "redis" or "db"
# Redis setup
use_redis_source = (
sse_source == "redis" and
cache.enabled and
bool(cache.redis_client)
)
redis_channel = AuditService.progress_channel(audit_id)
# Channel format: "audit:progress:{audit_id}"
pubsub = None
try:
# 1. Send initial state immediately
initial = initial_payload or await run_in_threadpool(
_load_owned_audit_payload, audit_id, current_user
)
yield _serialize_sse_event(
ServerSentEvent(raw_data=json.dumps(initial), retry=retry_ms)
)
# 2. Check if already terminal
if initial["status"] in {"completed", "failed"}:
return
# 3. Subscribe to Redis channel
if use_redis_source:
try:
pubsub = cache.redis_client.pubsub()
await run_in_threadpool(pubsub.subscribe, redis_channel)
logger.info(f"SSE Redis subscribed: {redis_channel}")
except Exception as e:
logger.warning(f"Redis subscription failed: {e}")
pubsub = None
use_redis_source = False
# 4. Event loop: Redis + DB fallback + heartbeat
while True:
# Check timeout and disconnection
if await request.is_disconnected():
break
payload = None
# Try Redis message
if pubsub:
try:
message = await run_in_threadpool(
lambda: pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0
)
)
if message and message["type"] == "message":
payload = _decode_redis_payload(
message["data"], audit_id
)
except Exception as e:
logger.warning(f"Redis read failed: {e}")
pubsub = None
use_redis_source = False
# Fallback to DB if needed
should_check_db = (
payload is None and
(sse_source == "db" or
not use_redis_source or
time_since_last_db_check >= fallback_db_interval)
)
if should_check_db:
payload = await run_in_threadpool(
_load_owned_audit_payload, audit_id, current_user
)
# Emit payload if changed
if payload is not None:
if payload_changed(payload):
yield _serialize_sse_event(
ServerSentEvent(
raw_data=json.dumps(payload),
retry=retry_ms
)
)
# Check terminal status
if payload["status"] in {"completed", "failed"}:
break
# Send heartbeat if no activity
elif time_since_last_emit >= heartbeat_seconds:
yield _serialize_sse_event(
ServerSentEvent(comment="heartbeat", retry=retry_ms)
)
# Sleep if no Redis subscription
if not pubsub:
await asyncio.sleep(0.25)
finally:
# Cleanup Redis subscription
if pubsub:
await run_in_threadpool(pubsub.unsubscribe, redis_channel)
await run_in_threadpool(pubsub.close)
Configuration Variables
# SSE Configuration
SSE_SOURCE=redis # "redis" or "db"
SSE_FALLBACK_DB_INTERVAL_SECONDS=10 # DB polling interval when Redis unavailable
SSE_HEARTBEAT_SECONDS=30 # Keep-alive interval
SSE_RETRY_MS=5000 # Client reconnection interval
SSE_MAX_DURATION=3600 # Max stream duration (1 hour)
Publishing Progress from Workers
backend/app/services/audit_service.py
class AuditService:
@staticmethod
def progress_channel(audit_id: int) -> str:
"""Get Redis channel name for audit progress."""
return f"audit:progress:{audit_id}"
@staticmethod
def publish_progress(audit: Audit):
"""Publish progress update to Redis."""
from app.services.cache_service import cache
if not cache.enabled:
return
channel = AuditService.progress_channel(audit.id)
payload = {
"audit_id": audit.id,
"status": audit.status,
"progress": audit.progress,
"geo_score": audit.geo_score,
"total_pages": audit.total_pages,
"error_message": audit.error_message,
}
try:
cache.redis_client.publish(
channel,
json.dumps(payload, default=str)
)
logger.debug(f"Published progress to {channel}")
except Exception as e:
logger.error(f"Failed to publish progress: {e}")
# Usage in Celery task
@celery_app.task
def process_audit(audit_id: int):
db = SessionLocal()
try:
audit = AuditService.get_audit(db, audit_id)
# Update progress
audit.status = "running"
audit.progress = 25
db.commit()
# Publish to Redis for SSE clients
AuditService.publish_progress(audit)
# ... continue processing
finally:
db.close()
Next.js SSE Proxy
The frontend uses a Next.js API route to proxy SSE connections and add authentication:app/api/sse/audits/[id]/progress/route.ts
import { getAccessToken } from "@auth0/nextjs-auth0";
import { NextRequest } from "next/server";
const BACKEND_URL = process.env.API_URL || "http://backend:8000";
export async function GET(
request: NextRequest,
{ params }: { params: { id: string } }
) {
const auditId = params.id;
// Get access token
const { accessToken } = await getAccessToken();
if (!accessToken) {
return new Response("Unauthorized", { status: 401 });
}
// Proxy SSE connection to backend
const backendUrl = `${BACKEND_URL}/api/v1/sse/audits/${auditId}/progress`;
const response = await fetch(backendUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
});
if (!response.ok) {
return new Response(response.statusText, { status: response.status });
}
// Stream response to client
return new Response(response.body, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
}
- Hides backend URL from client
- Centralized authentication
- Handles token refresh
- Simplifies CORS
Frontend SSE Hook
TheuseAuditSSE hook manages SSE connections with automatic fallback:
hooks/useAuditSSE.ts
import { useEffect, useState, useRef, useCallback } from "react";
import logger from "@/lib/logger";
interface AuditProgress {
audit_id: number;
progress: number;
status: string;
error_message?: string;
geo_score?: number;
total_pages?: number;
}
interface UseAuditSSEOptions {
onMessage?: (data: AuditProgress) => void;
onComplete?: (data: AuditProgress) => void;
onError?: (error: Error) => void;
enabled?: boolean;
}
export function useAuditSSE(
auditId: string | number | undefined,
options: UseAuditSSEOptions = {}
) {
const [isConnected, setIsConnected] = useState(false);
const [lastMessage, setLastMessage] = useState<AuditProgress | null>(null);
const [error, setError] = useState<Error | null>(null);
const [useFallback, setUseFallback] = useState(false);
const eventSourceRef = useRef<EventSource | null>(null);
const reconnectAttemptsRef = useRef(0);
const maxReconnectAttempts = 3;
const enabled = options.enabled ?? true;
// Cleanup function
const cleanup = useCallback((intentional = true) => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
setIsConnected(false);
}, []);
// Fallback: Traditional polling
const startPolling = useCallback(() => {
if (!auditId || !enabled) return;
logger.log("[Fallback] Using polling instead of SSE");
setUseFallback(true);
const poll = async () => {
try {
const res = await fetch(
`/api/v1/audits/${auditId}/status`
);
if (!res.ok) {
if (res.status === 401 || res.status === 403) {
cleanup(true);
}
return;
}
const data: AuditProgress = await res.json();
setLastMessage(data);
if (options.onMessage) {
options.onMessage(data);
}
// Check terminal status
if (data.status === "completed" || data.status === "failed") {
if (options.onComplete) {
options.onComplete(data);
}
cleanup(true);
}
} catch (err) {
console.error("[Fallback] Polling error:", err);
}
};
// Poll every 3 seconds
poll();
const interval = setInterval(poll, 3000);
return () => clearInterval(interval);
}, [auditId, cleanup, enabled, options]);
// Connect to SSE
const connect = useCallback(async () => {
if (!auditId || !enabled) return;
cleanup(false);
const sseUrl = `/api/sse/audits/${auditId}/progress`;
logger.log(`[SSE] Connecting to proxy: ${sseUrl}`);
try {
const eventSource = new EventSource(sseUrl);
eventSourceRef.current = eventSource;
eventSource.onopen = () => {
logger.log("[SSE] Connection established");
setIsConnected(true);
setError(null);
setUseFallback(false);
reconnectAttemptsRef.current = 0;
};
eventSource.onmessage = (event) => {
try {
const data: AuditProgress = JSON.parse(event.data);
logger.log("[SSE] Message received:", data);
setLastMessage(data);
if (options.onMessage) {
options.onMessage(data);
}
// Check terminal status
if (data.status === "completed" || data.status === "failed") {
logger.log(`[SSE] Audit ${data.status}, closing connection`);
if (options.onComplete) {
options.onComplete(data);
}
cleanup(true);
}
} catch (err) {
console.error("[SSE] Failed to parse message:", err);
}
};
eventSource.onerror = (err) => {
console.error("[SSE] Connection error:", err);
setIsConnected(false);
const errorObj = new Error("SSE connection error");
setError(errorObj);
if (options.onError) {
options.onError(errorObj);
}
// Try reconnecting, then fallback to polling
if (reconnectAttemptsRef.current < maxReconnectAttempts) {
const delay = Math.min(
1000 * Math.pow(2, reconnectAttemptsRef.current),
10000
);
logger.log(
`[SSE] Reconnecting in ${delay}ms ` +
`(attempt ${reconnectAttemptsRef.current + 1}/${maxReconnectAttempts})`
);
setTimeout(() => {
reconnectAttemptsRef.current++;
connect();
}, delay);
} else {
console.warn(
"[SSE] Max reconnection attempts reached, " +
"falling back to polling"
);
cleanup(false);
startPolling();
}
};
} catch (err) {
console.error("[SSE] Failed to create EventSource:", err);
startPolling();
}
}, [auditId, cleanup, startPolling, enabled, options]);
useEffect(() => {
if (!auditId || !enabled) {
cleanup(true);
return;
}
connect();
return () => cleanup(true);
}, [connect, cleanup, auditId, enabled]);
return {
isConnected,
lastMessage,
error,
useFallback,
reconnect: connect,
};
}
Hook Features
Automatic Reconnection
Exponential backoff (1s → 2s → 4s → 8s)
Fallback to Polling
Switches to 3-second polling after 3 failed reconnections
Lifecycle Callbacks
onMessage, onComplete, onError hooksConnection Status
isConnected, useFallback indicatorsUsage Example
import { useAuditSSE } from "@/hooks/useAuditSSE";
import { toast } from "sonner";
function AuditProgressCard({ auditId }: { auditId: number }) {
const { lastMessage, isConnected, useFallback, error } = useAuditSSE(
auditId,
{
onMessage: (data) => {
console.log("Progress update:", data);
},
onComplete: (data) => {
toast.success(
`Audit ${data.status}! GEO Score: ${data.geo_score}`
);
},
onError: (error) => {
toast.error(error.message);
},
enabled: true,
}
);
return (
<Card>
<CardHeader>
<div className="flex items-center gap-2">
<h3>Audit Progress</h3>
{isConnected && (
<Badge variant="success">
<Signal className="w-3 h-3" />
Live
</Badge>
)}
{useFallback && (
<Badge variant="warning">Fallback Mode</Badge>
)}
</div>
</CardHeader>
<CardContent>
{lastMessage && (
<>
<Progress value={lastMessage.progress} />
<p>Status: {lastMessage.status}</p>
<p>Progress: {lastMessage.progress}%</p>
{lastMessage.geo_score && (
<p>GEO Score: {lastMessage.geo_score}</p>
)}
</>
)}
{error && <Alert variant="destructive">{error.message}</Alert>}
</CardContent>
</Card>
);
}
Heartbeat & Retry Mechanisms
Server-side Heartbeat
The backend sends periodic heartbeat comments to keep the connection alive:# Send heartbeat if no activity for 30 seconds
if time_since_last_emit >= heartbeat_seconds:
yield _serialize_sse_event(
ServerSentEvent(comment="heartbeat", retry=retry_ms)
)
Client-side Retry
The SSE spec includes automatic retry with configurable interval:# Server sends retry interval in milliseconds
ServerSentEvent(raw_data=data, retry=5000) # 5 seconds
EventSource automatically reconnects after network failures using this interval.
Monitoring & Debugging
Server-side Logging
logger.info(f"SSE connection established for audit {audit_id}")
logger.debug(f"Published progress to {channel}")
logger.warning(f"Redis subscription failed: {error}")
logger.error(f"SSE Redis read failed: {error}")
Client-side Logging
logger.log("[SSE] Connection established");
logger.log("[SSE] Message received:", data);
console.warn("[SSE] Max reconnection attempts reached");
console.error("[SSE] Connection error:", err);
Connection Status Indicators
{isConnected && <Badge variant="success">Live</Badge>}
{useFallback && <Badge variant="warning">Fallback Mode</Badge>}
{error && <Alert variant="destructive">{error.message}</Alert>}
Troubleshooting
Validate Redis
docker compose logs -f redis
Validate Backend SSE
docker compose logs -f backend | grep SSE
Test SSE Endpoint
curl -N -H "Authorization: Bearer $TOKEN" \
http://localhost:8000/api/v1/sse/audits/123/progress
Check Configuration
# Backend .env
SSE_SOURCE=redis
SSE_FALLBACK_DB_INTERVAL_SECONDS=10
SSE_HEARTBEAT_SECONDS=30
SSE_RETRY_MS=5000
Next Steps
Backend Architecture
Learn about the FastAPI backend
Frontend Architecture
Explore the Next.js frontend