ExpireEye uses JSON Web Tokens (JWT) for stateless authentication. Users receive a token upon successful login or registration, which must be included in the Authorization header for all protected API requests.
Authentication flow
The authentication system follows a standard JWT flow:
User submits credentials to /auth/login or /auth/signup
Server validates credentials and generates a JWT token
Token is returned to the client (and optionally set as a cookie)
Client includes token in Authorization header for subsequent requests
Middleware validates token and extracts user information
Request proceeds to the endpoint handler with user context
Tokens are valid for 4000 minutes (approximately 2.7 days) as configured in app/utils/jwt.py:9.
JWT token structure
Tokens are created using the PyJWT library and include user identification data:
import jwt
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv
load_dotenv()
SECRET_KEY = os.getenv( "SECRET_KEY" )
ACCESS_TOKEN_EXPIRE_MINUTES = 4000
def create_access_token ( data : dict ):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta( minutes = ACCESS_TOKEN_EXPIRE_MINUTES )
to_encode.update({ "exp" : expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY )
return encoded_jwt
Token payload
The token payload includes:
userId - Unique user identifier (UUID)
email - User’s email address
exp - Expiration timestamp (Unix time)
These claims are signed with HS256 algorithm using the SECRET_KEY from environment variables.
The SECRET_KEY must be kept secure and never committed to version control. Use a strong, randomly generated value in production.
User registration
New users register through the /auth/signup endpoint from app/routers/auth.py:62-128:
@router.post ( "/signup" )
def signup ( data : RegisterRequest, response : Response, db : Session = Depends(get_db)):
try :
name = data.name
email = data.email
password = data.password
dob = data.dob
# Check if user already exists
existing_user = db.query(User).filter(User.email == email).first()
if existing_user:
raise HTTPException( status_code = 409 , detail = "User already exists" )
if not email:
raise HTTPException( status_code = 400 , detail = "Email is required" )
if not password:
raise HTTPException( status_code = 400 , detail = "Password is required" )
if password and len (password) < 6 :
raise HTTPException(
status_code = 400 , detail = "Password must be at least 6 characters long"
)
# Hash password with bcrypt
new_user = User(
name = name,
email = email,
password = bcrypt.hashpw(password.encode( "utf-8" ), bcrypt.gensalt()).decode( "utf-8" ),
dob = dob if dob else "" ,
created_at = datetime.utcnow().isoformat(),
)
db.add(new_user)
db.commit()
db.refresh(new_user)
# Generate access token
access_token = create_access_token(
data = { "userId" : new_user.id, "email" : new_user.email}
)
# Set HTTP-only cookie
response.set_cookie(
key = "access_token" ,
value = access_token,
httponly = True ,
max_age = 10 * 24 * 60 * 60 , # 10 days in seconds
)
return {
"message" : "User created successfully" ,
"userId" : new_user.id,
"email" : new_user.email,
"dob" : new_user.dob,
"created_at" : new_user.created_at,
}
except SQLAlchemyError as e:
raise HTTPException( status_code = 500 , detail = "Internal server error" )
Password security
Passwords are hashed using bcrypt before storage:
bcrypt.hashpw(password.encode( "utf-8" ), bcrypt.gensalt()).decode( "utf-8" )
Bcrypt automatically generates a unique salt for each password and includes it in the hash output. This prevents rainbow table attacks and ensures that identical passwords produce different hashes.
Bcrypt is designed to be slow, making brute-force attacks computationally expensive. The algorithm automatically adapts its cost factor as hardware improves.
User login
Existing users authenticate through the /auth/login endpoint from app/routers/auth.py:15-59:
@router.post ( "/login" )
def login ( data : LoginRequest, response : Response, db : Session = Depends(get_db)):
try :
email = data.email
password = data.password
if not email:
raise HTTPException( detail = "Email is required" )
if not password:
raise HTTPException( detail = "Password is required" )
# Find user by email
user = db.query(User).filter(User.email == email).first()
if not user:
raise AuthError( status_code = 401 , detail = "Invalid email or password" )
# Verify password
if not bcrypt.checkpw(password.encode( "utf-8" ), user.password.encode( "utf-8" )):
raise AuthError( status_code = 401 , detail = "Invalid email or password" )
# Generate JWT token
access_token = create_access_token(
data = { "userId" : user.id, "email" : user.email}
)
# Set HTTP-only cookie
response.set_cookie(
key = "access_token" ,
value = access_token,
httponly = True ,
samesite = "lax" ,
secure = False ,
path = "/" ,
domain = "localhost" ,
max_age = 10 * 24 * 60 * 60 , # 10 days in seconds
)
return {
"message" : "Login successful" ,
"token" : access_token,
}
except SQLAlchemyError as e:
raise HTTPException( status_code = 500 , detail = "Internal server error" )
The endpoint verifies credentials and returns both a token in the response body and sets an HTTP-only cookie for browser-based clients.
Password verification
The bcrypt.checkpw() function compares the provided password with the stored hash:
bcrypt.checkpw(password.encode( "utf-8" ), user.password.encode( "utf-8" ))
This is a constant-time comparison that prevents timing attacks.
Token validation
Token validation happens in the authentication middleware from app/main.py:51-96:
@app.middleware ( "http" )
async def access_token_middleware ( request : Request, call_next ):
# Skip access token check for preflight requests
if request.method == "OPTIONS" :
return await call_next(request)
public_paths = [
"/api/auth/login" ,
"/api/auth/signup" ,
"/api/status" ,
"/docs" ,
"/redoc" ,
"/api/openapi.json" ,
]
# Skip access token check for public paths
if request.url.path in public_paths:
return await call_next(request)
# Extract token from Authorization header
auth_header = request.headers.get( "Authorization" )
if not auth_header:
return JSONResponse(
status_code = 401 ,
content = { "detail" : "Authorization header missing or invalid." },
)
access_token = auth_header.split( "Bearer " )[ - 1 ].strip()
if not access_token:
return JSONResponse(
status_code = 401 ,
content = { "detail" : "Access token is missing or invalid." },
)
try :
payload = decode_access_token(access_token)
request.state.user = payload # Store user info in request state
except Exception as e:
return JSONResponse(
status_code = 401 ,
content = { "detail" : "Access token is invalid." },
)
response = await call_next(request)
return response
Token decoding
The decode_access_token function verifies the token signature and expiration from app/utils/jwt.py:20-27:
def decode_access_token ( token : str ):
try :
payload = jwt.decode(token, SECRET_KEY , algorithms = [ "HS256" ])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
If decoding fails due to expiration or invalid signature, the function returns None, causing the middleware to reject the request.
The middleware currently doesn’t distinguish between expired tokens and invalid tokens in error messages. Consider adding specific error handling for better client experience.
Accessing user context
Once authenticated, endpoint handlers can access user information from the request state:
@router.post ( "/inventory/add" )
async def add_product (
product : AddProductRequest, request : Request, db : Session = Depends(get_db)
):
access_token = request.state.user
user_id = access_token.get( "userId" )
# Use user_id for operations
add_product_result = await add_product_to_inventory(
user_id = user_id,
product = { "productName" : product.productName, "category" : product.category},
db = db,
)
return add_product_result
The middleware stores the decoded token payload in request.state.user, making userId and email available throughout the request lifecycle.
WebSocket authentication
WebSocket connections use query parameter authentication from app/services/notification_service.py:31-40:
app/services/notification_service.py
async def notification_websocket ( websocket : WebSocket, access_token : str = Query( None )):
if not access_token:
await websocket.close( code = 1008 , reason = "Access token required" )
await websocket.accept()
payload = decode_access_token(access_token)
user_id = payload.get( "userId" )
notification_connections[user_id] = websocket
Clients connect to /ws/notification?access_token=<token> to establish authenticated WebSocket connections.
WebSocket authentication uses query parameters because the WebSocket protocol doesn’t support custom headers during the handshake in all browsers.
The system supports both authentication methods:
Header-based (recommended)
Cookie-based (browser)
Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
The middleware currently prioritizes the Authorization header from app/main.py:72-78:
auth_header = request.headers.get( "Authorization" )
if not auth_header:
return JSONResponse(
status_code = 401 ,
content = { "detail" : "Authorization header missing or invalid." },
)
access_token = auth_header.split( "Bearer " )[ - 1 ].strip()
While cookies are set during login/signup, the middleware only reads from the Authorization header. To support cookie-based authentication, you would need to check for cookies when the Authorization header is missing.
Security considerations
Token storage
For web clients:
Store tokens in memory or sessionStorage for single-page applications
Use HTTP-only cookies for traditional web applications
Never store tokens in localStorage (vulnerable to XSS attacks)
For mobile clients:
Use secure storage mechanisms (Keychain on iOS, Keystore on Android)
Encrypt tokens at rest
Clear tokens on logout
Token expiration
Tokens expire after 4000 minutes. Consider implementing:
Refresh token mechanism for long-lived sessions
Shorter access token expiration (15-60 minutes)
Token rotation on refresh
Automatic token renewal before expiration
Rate limiting
Consider implementing rate limiting on authentication endpoints to prevent:
Brute force password attacks
Account enumeration
Credential stuffing
HTTPS requirement
Always use HTTPS in production to protect tokens in transit. The current cookie configuration sets secure=False, which should be changed to secure=True when deploying with HTTPS.
Common authentication patterns
Protecting custom endpoints
To protect a new endpoint, ensure it’s not in the public_paths list and extract user context:
@router.post ( "/my-endpoint" )
async def my_endpoint ( request : Request, db : Session = Depends(get_db)):
# Automatically authenticated by middleware
user_id = request.state.user.get( "userId" )
# Your endpoint logic here
pass
Optional authentication
For endpoints that work with or without authentication:
@router.get ( "/optional-auth" )
async def optional_auth ( request : Request):
user = getattr (request.state, 'user' , None )
if user:
# Authenticated behavior
user_id = user.get( "userId" )
else :
# Anonymous behavior
pass
Add the endpoint to public_paths first, then manually check for authentication.
Service-to-service authentication
For backend service communication, generate service tokens with a special claim:
service_token = create_access_token(
data = { "userId" : "system" , "email" : "service@internal" , "service" : True }
)
Then check for the service flag in middleware or endpoint handlers.