Skip to main content
ExpireEye uses JSON Web Tokens (JWT) for stateless authentication. Users receive a token upon successful login or registration, which must be included in the Authorization header for all protected API requests.

Authentication flow

The authentication system follows a standard JWT flow:
  1. User submits credentials to /auth/login or /auth/signup
  2. Server validates credentials and generates a JWT token
  3. Token is returned to the client (and optionally set as a cookie)
  4. Client includes token in Authorization header for subsequent requests
  5. Middleware validates token and extracts user information
  6. Request proceeds to the endpoint handler with user context
Tokens are valid for 4000 minutes (approximately 2.7 days) as configured in app/utils/jwt.py:9.

JWT token structure

Tokens are created using the PyJWT library and include user identification data:
app/utils/jwt.py
import jwt
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv

load_dotenv()

SECRET_KEY = os.getenv("SECRET_KEY")
ACCESS_TOKEN_EXPIRE_MINUTES = 4000

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY)
    return encoded_jwt

Token payload

The token payload includes:
  • userId - Unique user identifier (UUID)
  • email - User’s email address
  • exp - Expiration timestamp (Unix time)
These claims are signed with HS256 algorithm using the SECRET_KEY from environment variables.
The SECRET_KEY must be kept secure and never committed to version control. Use a strong, randomly generated value in production.

User registration

New users register through the /auth/signup endpoint from app/routers/auth.py:62-128:
app/routers/auth.py
@router.post("/signup")
def signup(data: RegisterRequest, response: Response, db: Session = Depends(get_db)):
    try:
        name = data.name
        email = data.email
        password = data.password
        dob = data.dob
        
        # Check if user already exists
        existing_user = db.query(User).filter(User.email == email).first()
        
        if existing_user:
            raise HTTPException(status_code=409, detail="User already exists")
        
        if not email:
            raise HTTPException(status_code=400, detail="Email is required")
        
        if not password:
            raise HTTPException(status_code=400, detail="Password is required")
        
        if password and len(password) < 6:
            raise HTTPException(
                status_code=400, detail="Password must be at least 6 characters long"
            )
        
        # Hash password with bcrypt
        new_user = User(
            name=name,
            email=email,
            password=bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8"),
            dob=dob if dob else "",
            created_at=datetime.utcnow().isoformat(),
        )
        
        db.add(new_user)
        db.commit()
        db.refresh(new_user)
        
        # Generate access token
        access_token = create_access_token(
            data={"userId": new_user.id, "email": new_user.email}
        )
        
        # Set HTTP-only cookie
        response.set_cookie(
            key="access_token",
            value=access_token,
            httponly=True,
            max_age=10 * 24 * 60 * 60,  # 10 days in seconds
        )
        
        return {
            "message": "User created successfully",
            "userId": new_user.id,
            "email": new_user.email,
            "dob": new_user.dob,
            "created_at": new_user.created_at,
        }
    except SQLAlchemyError as e:
        raise HTTPException(status_code=500, detail="Internal server error")

Password security

Passwords are hashed using bcrypt before storage:
bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
Bcrypt automatically generates a unique salt for each password and includes it in the hash output. This prevents rainbow table attacks and ensures that identical passwords produce different hashes.
Bcrypt is designed to be slow, making brute-force attacks computationally expensive. The algorithm automatically adapts its cost factor as hardware improves.

User login

Existing users authenticate through the /auth/login endpoint from app/routers/auth.py:15-59:
app/routers/auth.py
@router.post("/login")
def login(data: LoginRequest, response: Response, db: Session = Depends(get_db)):
    try:
        email = data.email
        password = data.password
        
        if not email:
            raise HTTPException(detail="Email is required")
        if not password:
            raise HTTPException(detail="Password is required")
        
        # Find user by email
        user = db.query(User).filter(User.email == email).first()
        
        if not user:
            raise AuthError(status_code=401, detail="Invalid email or password")
        
        # Verify password
        if not bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")):
            raise AuthError(status_code=401, detail="Invalid email or password")
        
        # Generate JWT token
        access_token = create_access_token(
            data={"userId": user.id, "email": user.email}
        )
        
        # Set HTTP-only cookie
        response.set_cookie(
            key="access_token",
            value=access_token,
            httponly=True,
            samesite="lax",
            secure=False,
            path="/",
            domain="localhost",
            max_age=10 * 24 * 60 * 60,  # 10 days in seconds
        )
        
        return {
            "message": "Login successful",
            "token": access_token,
        }
    except SQLAlchemyError as e:
        raise HTTPException(status_code=500, detail="Internal server error")
The endpoint verifies credentials and returns both a token in the response body and sets an HTTP-only cookie for browser-based clients.

Password verification

The bcrypt.checkpw() function compares the provided password with the stored hash:
bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8"))
This is a constant-time comparison that prevents timing attacks.

Token validation

Token validation happens in the authentication middleware from app/main.py:51-96:
app/main.py
@app.middleware("http")
async def access_token_middleware(request: Request, call_next):
    # Skip access token check for preflight requests
    if request.method == "OPTIONS":
        return await call_next(request)
    
    public_paths = [
        "/api/auth/login",
        "/api/auth/signup",
        "/api/status",
        "/docs",
        "/redoc",
        "/api/openapi.json",
    ]
    
    # Skip access token check for public paths
    if request.url.path in public_paths:
        return await call_next(request)
    
    # Extract token from Authorization header
    auth_header = request.headers.get("Authorization")
    if not auth_header:
        return JSONResponse(
            status_code=401,
            content={"detail": "Authorization header missing or invalid."},
        )
    
    access_token = auth_header.split("Bearer ")[-1].strip()
    
    if not access_token:
        return JSONResponse(
            status_code=401,
            content={"detail": "Access token is missing or invalid."},
        )
    
    try:
        payload = decode_access_token(access_token)
        request.state.user = payload  # Store user info in request state
    except Exception as e:
        return JSONResponse(
            status_code=401,
            content={"detail": "Access token is invalid."},
        )
    
    response = await call_next(request)
    return response

Token decoding

The decode_access_token function verifies the token signature and expiration from app/utils/jwt.py:20-27:
def decode_access_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.ExpiredSignatureError:
        return None
    except jwt.InvalidTokenError:
        return None
If decoding fails due to expiration or invalid signature, the function returns None, causing the middleware to reject the request.
The middleware currently doesn’t distinguish between expired tokens and invalid tokens in error messages. Consider adding specific error handling for better client experience.

Accessing user context

Once authenticated, endpoint handlers can access user information from the request state:
app/routers/product.py
@router.post("/inventory/add")
async def add_product(
    product: AddProductRequest, request: Request, db: Session = Depends(get_db)
):
    access_token = request.state.user
    user_id = access_token.get("userId")
    
    # Use user_id for operations
    add_product_result = await add_product_to_inventory(
        user_id=user_id,
        product={"productName": product.productName, "category": product.category},
        db=db,
    )
    
    return add_product_result
The middleware stores the decoded token payload in request.state.user, making userId and email available throughout the request lifecycle.

WebSocket authentication

WebSocket connections use query parameter authentication from app/services/notification_service.py:31-40:
app/services/notification_service.py
async def notification_websocket(websocket: WebSocket, access_token: str = Query(None)):
    if not access_token:
        await websocket.close(code=1008, reason="Access token required")
    
    await websocket.accept()
    
    payload = decode_access_token(access_token)
    user_id = payload.get("userId")
    
    notification_connections[user_id] = websocket
Clients connect to /ws/notification?access_token=<token> to establish authenticated WebSocket connections.
WebSocket authentication uses query parameters because the WebSocket protocol doesn’t support custom headers during the handshake in all browsers.
The system supports both authentication methods:
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
The middleware currently prioritizes the Authorization header from app/main.py:72-78:
auth_header = request.headers.get("Authorization")
if not auth_header:
    return JSONResponse(
        status_code=401,
        content={"detail": "Authorization header missing or invalid."},
    )
access_token = auth_header.split("Bearer ")[-1].strip()
While cookies are set during login/signup, the middleware only reads from the Authorization header. To support cookie-based authentication, you would need to check for cookies when the Authorization header is missing.

Security considerations

Token storage

For web clients:
  • Store tokens in memory or sessionStorage for single-page applications
  • Use HTTP-only cookies for traditional web applications
  • Never store tokens in localStorage (vulnerable to XSS attacks)
For mobile clients:
  • Use secure storage mechanisms (Keychain on iOS, Keystore on Android)
  • Encrypt tokens at rest
  • Clear tokens on logout

Token expiration

Tokens expire after 4000 minutes. Consider implementing:
  • Refresh token mechanism for long-lived sessions
  • Shorter access token expiration (15-60 minutes)
  • Token rotation on refresh
  • Automatic token renewal before expiration

Rate limiting

Consider implementing rate limiting on authentication endpoints to prevent:
  • Brute force password attacks
  • Account enumeration
  • Credential stuffing

HTTPS requirement

Always use HTTPS in production to protect tokens in transit. The current cookie configuration sets secure=False, which should be changed to secure=True when deploying with HTTPS.

Common authentication patterns

Protecting custom endpoints

To protect a new endpoint, ensure it’s not in the public_paths list and extract user context:
@router.post("/my-endpoint")
async def my_endpoint(request: Request, db: Session = Depends(get_db)):
    # Automatically authenticated by middleware
    user_id = request.state.user.get("userId")
    
    # Your endpoint logic here
    pass

Optional authentication

For endpoints that work with or without authentication:
@router.get("/optional-auth")
async def optional_auth(request: Request):
    user = getattr(request.state, 'user', None)
    
    if user:
        # Authenticated behavior
        user_id = user.get("userId")
    else:
        # Anonymous behavior
        pass
Add the endpoint to public_paths first, then manually check for authentication.

Service-to-service authentication

For backend service communication, generate service tokens with a special claim:
service_token = create_access_token(
    data={"userId": "system", "email": "service@internal", "service": True}
)
Then check for the service flag in middleware or endpoint handlers.

Build docs developers (and LLMs) love