Overview
Tesis Rutas uses JSON Web Tokens (JWT) for secure authentication and implements role-based access control (RBAC) with three user roles: visitante, editor, and administrador.
Prerequisites
Python 3.8+
FastAPI framework
python-jose for JWT handling
bcrypt for password hashing
Configuration
Set JWT Environment Variables
Configure your JWT settings in local_config.json or .env: {
"JWT_SECRET_KEY" : "your-super-secret-key-min-32-characters" ,
"JWT_ALGORITHM" : "HS256" ,
"JWT_EXPIRE_MINUTES" : 60
}
Use a strong secret key in production. Generate one with: openssl rand -hex 32
Load Settings
Settings are automatically loaded via the configuration system: class Settings ( BaseSettings ):
# JWT
jwt_secret_key: str | None = None
jwt_algorithm: str = "HS256"
jwt_expire_minutes: int = 60
Initialize Auth Service
The auth service is available as a singleton instance: src/infrastructure/security/auth_service.py
from src.infrastructure.security.auth_service import auth_service
# Use the global instance
token = auth_service.create_access_token({ "id" : user_id})
Auth Service Implementation
Password Hashing
The AuthService class handles secure password operations:
src/infrastructure/security/auth_service.py
import bcrypt
from datetime import datetime, timedelta
from jose import jwt, JWTError
from src.config.settings import get_settings
settings = get_settings()
class AuthService :
def __init__ ( self ):
self .secret_key = settings.jwt_secret_key
self .algorithm = settings.jwt_algorithm
self .access_token_expires_minutes = settings.jwt_expire_minutes
# Hash password
def hash_password ( self , password : str ) -> str :
hashed = bcrypt.hashpw(password.encode( "utf-8" ), bcrypt.gensalt())
return hashed.decode( "utf-8" )
# Verify password
def verify_password ( self , plain : str , hashed : str ) -> bool :
try :
return bcrypt.checkpw(plain.encode( "utf-8" ), hashed.encode( "utf-8" ))
except :
return False
JWT Token Creation
src/infrastructure/security/auth_service.py
def create_access_token ( self , data : dict , expires_minutes : int = None ):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(
minutes = expires_minutes or self .access_token_expires_minutes
)
to_encode.update({ "exp" : expire})
token = jwt.encode(to_encode, self .secret_key, algorithm = self .algorithm)
return token
JWT Token Decoding
src/infrastructure/security/auth_service.py
def decode_token ( self , token : str ):
try :
payload = jwt.decode(token, self .secret_key, algorithms = [ self .algorithm])
return payload
except JWTError:
return None
# Global singleton instance
auth_service = AuthService()
Authentication Endpoints
User Registration
src/infrastructure/api/routers/auth_router.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, EmailStr
router = APIRouter( prefix = "/auth" , tags = [ "Autenticación" ])
class RegisterRequest ( BaseModel ):
nombre: str
email: EmailStr
password: str
@router.post ( "/register" )
def register_user ( data : RegisterRequest):
"""
Registrar un nuevo usuario.
"""
try :
usuario = register_uc.execute(
nombre = data.nombre,
email = data.email,
password = data.password,
)
return {
"message" : "Usuario registrado correctamente." ,
"usuario" : {
"id" : usuario.id,
"nombre" : usuario.nombre,
"email" : usuario.email,
"rol" : usuario.rol,
}
}
except ValueError as e:
raise HTTPException( status_code = 400 , detail = str (e))
User Login
src/infrastructure/api/routers/auth_router.py
class LoginRequest ( BaseModel ):
email: EmailStr
password: str
@router.post ( "/login" )
def login_user ( data : LoginRequest):
"""
Login con email y password → devuelve JWT.
"""
try :
respuesta = auth_uc.login(data.email, data.password)
return respuesta
except ValueError as e:
raise HTTPException( status_code = 401 , detail = str (e))
The login response includes the JWT token:
{
"access_token" : "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." ,
"token_type" : "bearer" ,
"usuario" : {
"id" : "user_id" ,
"nombre" : "John Doe" ,
"email" : "[email protected] " ,
"rol" : "visitante"
}
}
Protecting Routes
Extract and validate JWT tokens from request headers:
src/infrastructure/security/jwt_utils.py
from fastapi import Request, HTTPException, status
from src.infrastructure.security.auth_service import auth_service
def extract_token_from_request ( request : Request):
auth_header = request.headers.get( "Authorization" )
if not auth_header or not auth_header.startswith( "Bearer " ):
raise HTTPException(
status_code = status. HTTP_401_UNAUTHORIZED ,
detail = "Token no proporcionado o inválido"
)
return auth_header.split( " " )[ 1 ]
Get Current User
Retrieve the authenticated user from the token:
src/infrastructure/security/jwt_utils.py
from src.infrastructure.database.mongo_config import get_database
from src.infrastructure.database.usuario_repository_impl import UsuarioRepositoryImpl
db = get_database()
usuario_repo = UsuarioRepositoryImpl(db)
def get_current_user ( request : Request):
token = extract_token_from_request(request)
payload = auth_service.decode_token(token)
if not payload:
raise HTTPException(
status_code = 401 ,
detail = "Token inválido o expirado"
)
user_id = payload.get( "id" )
if not user_id:
raise HTTPException(
status_code = 401 ,
detail = "Token sin ID de usuario"
)
usuario = usuario_repo.obtener_por_id(user_id)
if not usuario:
raise HTTPException(
status_code = 404 ,
detail = "Usuario no encontrado"
)
return usuario
Role-Based Access Control
User Roles
Tesis Rutas implements three user roles:
Role Permissions visitanteView content, manage favorites editorVisitante permissions + edit content, change status administradorFull access, create/delete/manage all resources
Role Validation
src/infrastructure/security/jwt_utils.py
def require_role ( request : Request, roles_permitidos : list[ str ]):
usuario = get_current_user(request)
if usuario.rol not in roles_permitidos:
raise HTTPException(
status_code = 403 ,
detail = "No tienes permisos para realizar esta acción"
)
return usuario
Role-Specific Helpers
src/infrastructure/security/jwt_utils.py
def require_user ( request : Request):
"""
Usuario autenticado (cualquier rol excepto visitante no registrado).
"""
return require_role(request, [ "visitante" , "editor" , "administrador" ])
def require_editor ( request : Request):
"""
Puede editar contenido.
"""
return require_role(request, [ "editor" , "administrador" ])
def require_admin ( request : Request):
"""
Acceso total.
"""
return require_role(request, [ "administrador" ])
Using Role Protection in Endpoints
src/infrastructure/api/routers/destinos_router.py
from fastapi import APIRouter, Depends
from src.infrastructure.security.jwt_utils import require_admin, require_editor, require_user
router = APIRouter( prefix = "/destinos" , tags = [ "Destinos" ])
# Public endpoint - no authentication required
@router.get ( "/" )
def listar_destinos ( db = Depends(get_database)):
repo = DestinoRepositoryImpl(db)
destinos = repo.obtener_todos()
return { "total" : len (destinos), "data" : destinos}
# Editor or admin required
@router.put ( "/estado/ {id} " )
def cambiar_estado_destino (
id : str ,
db = Depends(get_database),
editor = Depends(require_editor)
):
repo = DestinoRepositoryImpl(db)
use_case = CambiarEstadoDestinoUseCase(repo)
mensaje = use_case.ejecutar( id )
return { "message" : mensaje}
# Admin only
@router.post ( "/" )
def crear_destino (
data : dict ,
db = Depends(get_database),
admin = Depends(require_admin)
):
repo = DestinoRepositoryImpl(db)
use_case = AgregarDestinoUseCase(repo)
destino_id = use_case.ejecutar(data)
return { "message" : "Destino creado correctamente" , "id" : destino_id}
Frontend Integration
Axios Interceptor for JWT
Automatically attach JWT tokens to all API requests:
import axios from "axios" ;
const api = axios . create ({
baseURL: import . meta . env . VITE_API_URL ,
headers: {
"Content-Type" : "application/json" ,
},
});
// Request interceptor - attach JWT token
api . interceptors . request . use (
( config ) => {
const token = localStorage . getItem ( "token" );
if ( token ) {
config . headers . Authorization = `Bearer ${ token } ` ;
}
return config ;
},
( error ) => Promise . reject ( error )
);
// Response interceptor - handle 401 errors
api . interceptors . response . use (
( response ) => response ,
( error ) => {
if ( error . response ?. status === 401 ) {
localStorage . removeItem ( "token" );
localStorage . removeItem ( "user" );
// Redirect to login
window . location . href = "/login" ;
}
return Promise . reject ( error );
}
);
export default api ;
Login API Call
import api from "../axios" ;
/**
* Login de usuario
* @param {{ email: string, password: string }} data
*/
export const loginUser = ( data ) => {
return api . post ( "/auth/login" , data );
};
/**
* Registrar usuario
* @param {{ nombre: string, email: string, password: string }} data
*/
export const registerUser = ( data ) => {
return api . post ( "/auth/register" , data );
};
Storing Token
import { loginUser } from "@/api/auth/auth.api" ;
async function handleLogin ( email , password ) {
try {
const response = await loginUser ({ email , password });
// Store token and user data
localStorage . setItem ( "token" , response . data . access_token );
localStorage . setItem ( "user" , JSON . stringify ( response . data . usuario ));
// Redirect to dashboard
navigate ( "/dashboard" );
} catch ( error ) {
console . error ( "Login failed:" , error );
}
}
Token Expiration
Tokens expire based on JWT_EXPIRE_MINUTES (default: 60 minutes).
Handling Expired Tokens
The axios interceptor automatically handles 401 responses:
api . interceptors . response . use (
( response ) => response ,
( error ) => {
if ( error . response ?. status === 401 ) {
// Clear stored credentials
localStorage . removeItem ( "token" );
localStorage . removeItem ( "user" );
// Redirect to login
window . location . href = "/login" ;
}
return Promise . reject ( error );
}
);
Users will be automatically redirected to login when their token expires.
Security Best Practices
Generate a cryptographically secure secret key: Never commit secret keys to version control.
Set Appropriate Token Expiration
Balance security and user experience:
Short expiration (15-60 min) for sensitive operations
Longer expiration (24 hours) for less critical apps
Implement refresh tokens for long-lived sessions
Always serve your API over HTTPS to prevent token interception.
Protect authentication endpoints from brute force attacks: from slowapi import Limiter
@limiter.limit ( "5/minute" )
@router.post ( "/login" )
def login_user ( data : LoginRequest):
...
Testing Authentication
Test with cURL
# Login and get token
curl -X POST http://localhost:8000/auth/login \
-H "Content-Type: application/json" \
-d '{"email": "[email protected] ", "password": "password123"}'
# Use token for authenticated request
curl -X GET http://localhost:8000/destinos \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
Test with Python
import requests
# Login
response = requests.post(
"http://localhost:8000/auth/login" ,
json = { "email" : "[email protected] " , "password" : "password123" }
)
token = response.json()[ "access_token" ]
# Authenticated request
headers = { "Authorization" : f "Bearer { token } " }
response = requests.get(
"http://localhost:8000/destinos" ,
headers = headers
)
print (response.json())
Next Steps
API Reference Explore authentication endpoints
User Roles Learn about role management