ExpireEye provides a real-time notification system using WebSocket connections to instantly alert users about product expirations, scan confirmations, and inventory updates.
WebSocket connection
Establish a WebSocket connection to receive real-time notifications:
ws://localhost:8000/api/ws/notification?access_token=YOUR_JWT_TOKEN
Connection flow
Connect with authentication
Pass your JWT access token as a query parameter:@app.websocket("/ws/notification")
async def websocket_notification_endpoint(
websocket: WebSocket,
access_token: str = Query(None)
):
if not access_token:
await websocket.close(code=1008, reason="Access token required")
await websocket.accept()
Decode token and register connection
The server validates your token and registers your WebSocket connection:payload = decode_access_token(access_token)
user_id = payload.get("userId")
notification_connections[user_id] = websocket
Receive connection confirmation
You receive a confirmation message when connected:{
"type": "CONNECTION_ESTABLISHED",
"message": "Notification WebSocket connected",
"userId": "your-user-id"
}
Notification types
Product expiration
When a product expires, you receive a warning notification:
{
"id": "notification-uuid",
"type": "product_expiration",
"message": "Product Apple has expired",
"productName": "Apple",
"expiryDate": "2024-03-04T10:30:00",
"timestamp": "2024-03-04T12:00:00"
}
Product scanned
When you scan a product using YOLO detection:
{
"type": "Product_Scanned",
"message": "Product Scanned successfully",
"data": {
"name": "Apple",
"confidence": 0.9,
"quantity": 1,
"notes": "Detected by YOLO with 90% confidence",
"expiryDate": "2024-03-10T00:00:00",
"nutrition": {
"energy_kcal": "52",
"carbohydrate": "14",
"protein": "0.3",
"fiber": "2.4",
"total_sugars": "10",
"saturated_fat": "0.1",
"vitamin_a": "54",
"vitamin_c": "4.6",
"potassium": "107",
"iron": "0.12",
"calcium": "6",
"sodium": "1",
"cholesterol": "0"
}
}
}
Connection management
The system maintains active WebSocket connections in memory:
notification_connections = {}
# Register connection
notification_connections[user_id] = websocket
# Send notification to specific user
async def send_notification_to_user(user_id: str, message: dict):
if user_id in notification_connections:
try:
await notification_connections[user_id].send_text(json.dumps(message))
except Exception as e:
print(f"Error sending notification to user {user_id}: {e}")
del notification_connections[user_id]
Failed connections are automatically removed from the active connections dictionary.
Handling disconnections
The system gracefully handles WebSocket disconnections:
try:
while True:
data = await websocket.receive_text()
# Process incoming messages
except WebSocketDisconnect:
print(f"WebSocket disconnected for user {user_id}")
db.close()
if user_id in notification_connections:
del notification_connections[user_id]
Client-side actions
Mark notification as read
Send a message to mark a notification as read:
{
"action": "mark_read",
"id": "notification-uuid"
}
Server-side handling:
if data_json.get("action") == "mark_read" and data_json.get("id"):
notification_id = data_json["id"]
notification = (
db.query(Notification)
.filter(Notification.id == notification_id)
.first()
)
if notification:
notification.read = True
db.commit()
Keep-alive ping
Send periodic pings to keep the connection alive:
You’ll receive a pong response:
Database persistence
Notifications are stored in the database for later retrieval:
async def add_notification_to_db(
user_id: str,
productName: str,
message: str,
type: str,
db: Session,
):
new_notification = Notification(
userId=user_id,
message=message,
type=type,
productName=productName,
read=False,
created_at=datetime.utcnow().isoformat(),
)
db.add(new_notification)
db.commit()
db.refresh(new_notification)
return new_notification
Notification model
Notifications are stored with the following structure:
class Notification(Base):
__tablename__ = "notifications"
id = Column(String(36), primary_key=True)
userId = Column(String(36), ForeignKey("users.id"))
message = Column(String(255))
type = Column(String(50)) # "info", "warning"
productName = Column(String(255))
read = Column(Boolean, default=False)
created_at = Column(String(255))
Integration with expiry detection
The expiry detection scheduler automatically sends notifications:
# From check_product_expiry() in app/services/product_service.py
for product in expired_products:
# Update product status
product.status = "expired"
# Create notification
notification = await add_notification_to_db(
user_id=str(product.userId),
productName=product_details.name,
message=f"Product {product_details.name} has expired",
type="warning",
db=db,
)
# Send real-time notification
notification_message = {
"id": notification.id,
"type": "product_expiration",
"message": f"Product {product_details.name} has expired",
"productName": product_details.name,
"expiryDate": product.expiryDate,
"timestamp": current_time,
}
await send_notification_to_user(str(product.userId), notification_message)
Notifications are sent in real-time only if the user has an active WebSocket connection. Otherwise, they’re stored in the database for later retrieval.
Client implementation example
JavaScript/TypeScript
const token = localStorage.getItem('access_token');
const ws = new WebSocket(
`ws://localhost:8000/api/ws/notification?access_token=${token}`
);
ws.onopen = () => {
console.log('WebSocket connected');
};
ws.onmessage = (event) => {
const notification = JSON.parse(event.data);
if (notification.type === 'product_expiration') {
showExpiryAlert(notification);
} else if (notification.type === 'Product_Scanned') {
showScanSuccess(notification);
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('WebSocket disconnected');
// Implement reconnection logic
};
// Mark notification as read
function markAsRead(notificationId) {
ws.send(JSON.stringify({
action: 'mark_read',
id: notificationId
}));
}
// Keep connection alive
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send('PING');
}
}, 30000); // Every 30 seconds
Python client
import asyncio
import websockets
import json
async def connect_notifications(access_token):
uri = f"ws://localhost:8000/api/ws/notification?access_token={access_token}"
async with websockets.connect(uri) as websocket:
# Receive connection confirmation
message = await websocket.recv()
print(f"Connected: {message}")
# Listen for notifications
while True:
notification = await websocket.recv()
data = json.loads(notification)
if data["type"] == "product_expiration":
print(f"⚠️ {data['message']}")
elif data["type"] == "Product_Scanned":
print(f"✅ {data['message']}")
# Run the client
asyncio.run(connect_notifications("YOUR_JWT_TOKEN"))
Error handling
Always implement reconnection logic in your client application to handle unexpected disconnections.
function connectWebSocket() {
const ws = new WebSocket(wsUrl);
ws.onclose = () => {
console.log('Connection lost. Reconnecting in 5 seconds...');
setTimeout(connectWebSocket, 5000);
};
return ws;
}
Security considerations
- WebSocket connections require valid JWT authentication
- Tokens are validated on connection establishment
- Each user can only receive their own notifications
- Connections are automatically closed if authentication fails
if not access_token:
await websocket.close(code=1008, reason="Access token required")
The WebSocket endpoint uses the same JWT authentication as REST API endpoints for consistent security.