Skip to main content
ExpireEye provides a real-time notification system using WebSocket connections to instantly alert users about product expirations, scan confirmations, and inventory updates.

WebSocket connection

Establish a WebSocket connection to receive real-time notifications:
ws://localhost:8000/api/ws/notification?access_token=YOUR_JWT_TOKEN

Connection flow

1

Connect with authentication

Pass your JWT access token as a query parameter:
@app.websocket("/ws/notification")
async def websocket_notification_endpoint(
    websocket: WebSocket, 
    access_token: str = Query(None)
):
    if not access_token:
        await websocket.close(code=1008, reason="Access token required")
    
    await websocket.accept()
2

Decode token and register connection

The server validates your token and registers your WebSocket connection:
payload = decode_access_token(access_token)
user_id = payload.get("userId")

notification_connections[user_id] = websocket
3

Receive connection confirmation

You receive a confirmation message when connected:
{
  "type": "CONNECTION_ESTABLISHED",
  "message": "Notification WebSocket connected",
  "userId": "your-user-id"
}

Notification types

Product expiration

When a product expires, you receive a warning notification:
{
  "id": "notification-uuid",
  "type": "product_expiration",
  "message": "Product Apple has expired",
  "productName": "Apple",
  "expiryDate": "2024-03-04T10:30:00",
  "timestamp": "2024-03-04T12:00:00"
}

Product scanned

When you scan a product using YOLO detection:
{
  "type": "Product_Scanned",
  "message": "Product Scanned successfully",
  "data": {
    "name": "Apple",
    "confidence": 0.9,
    "quantity": 1,
    "notes": "Detected by YOLO with 90% confidence",
    "expiryDate": "2024-03-10T00:00:00",
    "nutrition": {
      "energy_kcal": "52",
      "carbohydrate": "14",
      "protein": "0.3",
      "fiber": "2.4",
      "total_sugars": "10",
      "saturated_fat": "0.1",
      "vitamin_a": "54",
      "vitamin_c": "4.6",
      "potassium": "107",
      "iron": "0.12",
      "calcium": "6",
      "sodium": "1",
      "cholesterol": "0"
    }
  }
}

Connection management

The system maintains active WebSocket connections in memory:
notification_connections = {}

# Register connection
notification_connections[user_id] = websocket

# Send notification to specific user
async def send_notification_to_user(user_id: str, message: dict):
    if user_id in notification_connections:
        try:
            await notification_connections[user_id].send_text(json.dumps(message))
        except Exception as e:
            print(f"Error sending notification to user {user_id}: {e}")
            del notification_connections[user_id]
Failed connections are automatically removed from the active connections dictionary.

Handling disconnections

The system gracefully handles WebSocket disconnections:
try:
    while True:
        data = await websocket.receive_text()
        # Process incoming messages
except WebSocketDisconnect:
    print(f"WebSocket disconnected for user {user_id}")
    db.close()
    if user_id in notification_connections:
        del notification_connections[user_id]

Client-side actions

Mark notification as read

Send a message to mark a notification as read:
{
  "action": "mark_read",
  "id": "notification-uuid"
}
Server-side handling:
if data_json.get("action") == "mark_read" and data_json.get("id"):
    notification_id = data_json["id"]
    notification = (
        db.query(Notification)
        .filter(Notification.id == notification_id)
        .first()
    )
    if notification:
        notification.read = True
        db.commit()

Keep-alive ping

Send periodic pings to keep the connection alive:
"PING"
You’ll receive a pong response:
{
  "type": "pong"
}

Database persistence

Notifications are stored in the database for later retrieval:
async def add_notification_to_db(
    user_id: str,
    productName: str,
    message: str,
    type: str,
    db: Session,
):
    new_notification = Notification(
        userId=user_id,
        message=message,
        type=type,
        productName=productName,
        read=False,
        created_at=datetime.utcnow().isoformat(),
    )
    db.add(new_notification)
    db.commit()
    db.refresh(new_notification)
    return new_notification

Notification model

Notifications are stored with the following structure:
class Notification(Base):
    __tablename__ = "notifications"
    
    id = Column(String(36), primary_key=True)
    userId = Column(String(36), ForeignKey("users.id"))
    message = Column(String(255))
    type = Column(String(50))  # "info", "warning"
    productName = Column(String(255))
    read = Column(Boolean, default=False)
    created_at = Column(String(255))

Integration with expiry detection

The expiry detection scheduler automatically sends notifications:
# From check_product_expiry() in app/services/product_service.py
for product in expired_products:
    # Update product status
    product.status = "expired"
    
    # Create notification
    notification = await add_notification_to_db(
        user_id=str(product.userId),
        productName=product_details.name,
        message=f"Product {product_details.name} has expired",
        type="warning",
        db=db,
    )
    
    # Send real-time notification
    notification_message = {
        "id": notification.id,
        "type": "product_expiration",
        "message": f"Product {product_details.name} has expired",
        "productName": product_details.name,
        "expiryDate": product.expiryDate,
        "timestamp": current_time,
    }
    
    await send_notification_to_user(str(product.userId), notification_message)
Notifications are sent in real-time only if the user has an active WebSocket connection. Otherwise, they’re stored in the database for later retrieval.

Client implementation example

JavaScript/TypeScript

const token = localStorage.getItem('access_token');
const ws = new WebSocket(
  `ws://localhost:8000/api/ws/notification?access_token=${token}`
);

ws.onopen = () => {
  console.log('WebSocket connected');
};

ws.onmessage = (event) => {
  const notification = JSON.parse(event.data);
  
  if (notification.type === 'product_expiration') {
    showExpiryAlert(notification);
  } else if (notification.type === 'Product_Scanned') {
    showScanSuccess(notification);
  }
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

ws.onclose = () => {
  console.log('WebSocket disconnected');
  // Implement reconnection logic
};

// Mark notification as read
function markAsRead(notificationId) {
  ws.send(JSON.stringify({
    action: 'mark_read',
    id: notificationId
  }));
}

// Keep connection alive
setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send('PING');
  }
}, 30000); // Every 30 seconds

Python client

import asyncio
import websockets
import json

async def connect_notifications(access_token):
    uri = f"ws://localhost:8000/api/ws/notification?access_token={access_token}"
    
    async with websockets.connect(uri) as websocket:
        # Receive connection confirmation
        message = await websocket.recv()
        print(f"Connected: {message}")
        
        # Listen for notifications
        while True:
            notification = await websocket.recv()
            data = json.loads(notification)
            
            if data["type"] == "product_expiration":
                print(f"⚠️ {data['message']}")
            elif data["type"] == "Product_Scanned":
                print(f"✅ {data['message']}")

# Run the client
asyncio.run(connect_notifications("YOUR_JWT_TOKEN"))

Error handling

Always implement reconnection logic in your client application to handle unexpected disconnections.
function connectWebSocket() {
  const ws = new WebSocket(wsUrl);
  
  ws.onclose = () => {
    console.log('Connection lost. Reconnecting in 5 seconds...');
    setTimeout(connectWebSocket, 5000);
  };
  
  return ws;
}

Security considerations

  • WebSocket connections require valid JWT authentication
  • Tokens are validated on connection establishment
  • Each user can only receive their own notifications
  • Connections are automatically closed if authentication fails
if not access_token:
    await websocket.close(code=1008, reason="Access token required")
The WebSocket endpoint uses the same JWT authentication as REST API endpoints for consistent security.

Build docs developers (and LLMs) love