Skip to main content
The ExpireEye backend is built on FastAPI, a modern Python web framework that provides high performance and automatic API documentation. The architecture follows a layered pattern with clear separation of concerns across routers, services, models, and database layers.

Application structure

The backend follows a modular architecture organized into distinct layers:
app/
├── main.py              # Application entry point
├── routers/             # API endpoint definitions
├── services/            # Business logic layer
├── models/              # SQLAlchemy ORM models
├── schemas/             # Pydantic request/response models
├── db/                  # Database configuration
├── utils/               # Helper utilities
└── core/                # Core configurations

Core application setup

The FastAPI application is initialized in main.py with API versioning support:
app/main.py
app = FastAPI(root_path="/api", root_path_in_servers="/api")
All API endpoints are automatically prefixed with /api, allowing for clean URL structure and easier reverse proxy configuration.

Router layer

Routers define the API endpoints and handle HTTP request/response cycles. Each router focuses on a specific domain:
app.include_router(auth_router, prefix="/auth", tags=["auth"])

Available routers

The application includes these primary routers from app/main.py:162-168:
  • auth - User authentication (login, signup)
  • product - Product inventory management
  • user_inventory - User-specific product tracking
  • user - User profile management
  • notification_router - Real-time notifications via WebSocket
  • detection - YOLO-based object detection
  • stats - Usage statistics and analytics
All routers except /auth/login, /auth/signup, and /status require authentication via JWT token in the Authorization header.

Service layer

Services encapsulate business logic and interact with the database. They are called by routers and keep endpoint handlers clean and focused.

Product service

The product_service.py handles core product operations:
app/services/product_service.py
async def add_product_to_inventory(user_id: str, product: dict, db: Session):
    product_name = product["productName"]
    category = product["category"]
    product_barcode = generate_product_barcode(product_name)
    
    # Fetch nutrition data from external API
    food_nutrition = fetch_nutrition(product_name)
    
    # Create nutrition record
    nutrition_data = {
        "energy_kcal": check_nutrition_exists("Energy (KCAL)", food_nutrition),
        "carbohydrate": check_nutrition_exists("Carbohydrate, by difference (G)", food_nutrition),
        # ... more nutrition fields
        "addedAt": datetime.utcnow().isoformat(),
    }
    new_nutrition = Nutrition(**nutrition_data)
    db.add(new_nutrition)
    db.commit()
    
    # Create product record
    product_data = {
        "name": product_name,
        "category": category,
        "barcode": product_barcode if product_barcode else "N/A",
        "nutritionId": new_nutrition.id,
        "addedAt": datetime.utcnow().isoformat(),
    }
    new_product = Product(**product_data)
    db.add(new_product)
    db.commit()
This service fetches nutrition information from an external API, creates database records, and returns the result to the router.

Notification service

The notification_service.py manages WebSocket connections and real-time notifications:
app/services/notification_service.py
notification_connections = {}

async def send_notification_to_user(user_id: str, message: dict):
    if user_id in notification_connections:
        try:
            await notification_connections[user_id].send_text(json.dumps(message))
        except Exception as e:
            print(f"Error sending notification to user {user_id}: {e}")
            del notification_connections[user_id]
Users connect via WebSocket at /ws/notification and receive real-time updates when products expire or are scanned.
The notification service maintains an in-memory dictionary of active WebSocket connections, indexed by user ID for quick message routing.

Background scheduler

The application uses APScheduler to run periodic background tasks:
app/main.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

scheduler = AsyncIOScheduler()

@app.on_event("startup")
async def startup_event():
    scheduler.add_job(check_product_expiry, CronTrigger(second="*/10"))
    scheduler.start()
    print("Scheduler started")

@app.on_event("shutdown")
async def shutdown_event():
    scheduler.shutdown()

Expiry checking job

The check_product_expiry function runs every 10 seconds from app/services/product_service.py:21-78:
async def check_product_expiry():
    db = next(get_db())
    current_time = datetime.utcnow().isoformat()
    
    expired_products = (
        db.query(UserProduct)
        .filter(
            (UserProduct.expiryDate < current_time) & 
            (UserProduct.status == "active")
        )
        .all()
    )
    
    for product in expired_products:
        # Update product status
        product.status = "expired"
        product.updatedAt = current_time
        db.add(product)
        
        # Create notification
        notification = await add_notification_to_db(
            user_id=str(product.userId),
            productName=product_details.name,
            message=f"Product {product_details.name} has expired",
            type="warning",
            db=db,
        )
        
        # Send real-time notification
        await send_notification_to_user(str(product.userId), notification_message)
        db.commit()
The scheduler runs in the same process as the FastAPI application. For production deployments with multiple workers, consider using a separate worker process or distributed task queue like Celery to avoid duplicate job execution.

Middleware stack

The application uses two types of middleware for cross-cutting concerns:

CORS middleware

Configured to allow requests from specific origins from app/main.py:34-48:
origins = [
    "http://localhost:5173",
    "http://127.0.0.1:5173",
    "https://expire-eye.vercel.app",
    "https://476d2d8e876e.ngrok-free.app",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*", "GET", "POST", "PUT", "DELETE", "OPTIONS"],
    allow_headers=["*", "Authorization"],
)

Authentication middleware

Custom middleware validates JWT tokens on every request from app/main.py:51-96:
@app.middleware("http")
async def access_token_middleware(request: Request, call_next):
    # Skip preflight requests
    if request.method == "OPTIONS":
        return await call_next(request)
    
    public_paths = [
        "/api/auth/login",
        "/api/auth/signup",
        "/api/status",
        "/docs",
        "/redoc",
        "/api/openapi.json",
    ]
    
    # Skip public paths
    if request.url.path in public_paths:
        return await call_next(request)
    
    # Extract and validate token
    auth_header = request.headers.get("Authorization")
    if not auth_header:
        return JSONResponse(
            status_code=401,
            content={"detail": "Authorization header missing or invalid."},
        )
    
    access_token = auth_header.split("Bearer ")[-1].strip()
    
    try:
        payload = decode_access_token(access_token)
        request.state.user = payload  # Store user info in request state
    except Exception as e:
        return JSONResponse(
            status_code=401,
            content={"detail": "Access token is invalid."},
        )
    
    response = await call_next(request)
    return response
The middleware stores decoded user information in request.state.user, making it available to all endpoint handlers without repetitive authentication logic.

WebSocket endpoints

The application provides WebSocket support for real-time features:
app/main.py
@app.websocket("/ws/notification")
async def websocket_notification_endpoint(
    websocket: WebSocket, access_token: str = Query(None)
):
    await notification_websocket(websocket, access_token)
Clients connect with an access token as a query parameter for authentication. The connection is maintained for receiving real-time notifications about product expiration and scan events.

Error handling

Custom exception handlers provide consistent error responses from app/main.py:99-115:
@app.exception_handler(RequestValidationError)
async def custom_validation_exception_handler(
    request: Request, exc: RequestValidationError
):
    errors = exc.errors()
    custom_errors = [
        {
            "field": err["loc"][-1],
            "message": (
                "This field is required."
                if err["msg"] == "field required"
                else err["msg"]
            ),
        }
        for err in errors
    ]
    return JSONResponse(status_code=422, content={"errors": custom_errors})
This transforms FastAPI’s default validation errors into a more user-friendly format.

Key architectural decisions

FastAPI provides automatic API documentation, type checking with Pydantic, async support, and excellent performance. It reduces boilerplate code and catches errors at development time rather than runtime.
This separation keeps endpoint handlers thin and focused on HTTP concerns while business logic lives in testable service functions. Services can be reused across multiple endpoints and are easier to unit test.
APScheduler runs within the Python process, making it easier to share database connections and application context. It’s also more portable across different deployment environments.
Middleware runs before routing, allowing early rejection of unauthenticated requests. It also provides a single point of authentication logic rather than adding a dependency to every protected endpoint.

Build docs developers (and LLMs) love