152 lines
4.0 KiB
Python
152 lines
4.0 KiB
Python
"""
|
|
Role-based permission system for web pages
|
|
Note: API endpoints (/api/v1/*) use API Key authentication and are not affected by this
|
|
"""
|
|
from typing import List
|
|
from fastapi import HTTPException, Request, status
|
|
from app.core.config import settings
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Roles:
|
|
"""Role constants"""
|
|
ADMIN = "admin"
|
|
OPERATION = "operation"
|
|
|
|
|
|
def get_user_roles(request: Request) -> List[str]:
|
|
"""
|
|
Get roles from session
|
|
|
|
Args:
|
|
request: FastAPI request object
|
|
|
|
Returns:
|
|
List of role names
|
|
"""
|
|
user = request.session.get("user")
|
|
if not user:
|
|
return []
|
|
return user.get("roles", [])
|
|
|
|
|
|
def has_role(request: Request, required_role: str) -> bool:
|
|
"""
|
|
Check if user has specific role
|
|
|
|
Args:
|
|
request: FastAPI request object
|
|
required_role: Role name to check
|
|
|
|
Returns:
|
|
True if user has the role, False otherwise
|
|
"""
|
|
user_roles = get_user_roles(request)
|
|
return required_role in user_roles
|
|
|
|
|
|
def has_any_role(request: Request, required_roles: List[str]) -> bool:
|
|
"""
|
|
Check if user has any of the required roles
|
|
|
|
Args:
|
|
request: FastAPI request object
|
|
required_roles: List of role names
|
|
|
|
Returns:
|
|
True if user has at least one of the roles, False otherwise
|
|
"""
|
|
user_roles = get_user_roles(request)
|
|
return any(role in user_roles for role in required_roles)
|
|
|
|
|
|
def require_role(required_role: str):
|
|
"""
|
|
Dependency to require specific role
|
|
|
|
Args:
|
|
required_role: Role name required to access the endpoint
|
|
|
|
Returns:
|
|
Dependency function that checks for the role
|
|
|
|
Raises:
|
|
HTTPException: 401 if not authenticated, 403 if missing required role
|
|
"""
|
|
def check_role(request: Request):
|
|
user = request.session.get("user")
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required"
|
|
)
|
|
|
|
user_roles = user.get("roles", [])
|
|
|
|
if required_role not in user_roles:
|
|
logger.warning(
|
|
f"Access denied: User {user.get('username')} "
|
|
f"(roles: {user_roles}) tried to access resource requiring '{required_role}'"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Access denied. Required role: {required_role}"
|
|
)
|
|
|
|
return user
|
|
|
|
return check_role
|
|
|
|
|
|
def require_any_role(required_roles: List[str]):
|
|
"""
|
|
Dependency to require any of the specified roles
|
|
|
|
Args:
|
|
required_roles: List of role names, user needs at least one
|
|
|
|
Returns:
|
|
Dependency function that checks for roles
|
|
|
|
Raises:
|
|
HTTPException: 401 if not authenticated, 403 if missing all required roles
|
|
"""
|
|
def check_roles(request: Request):
|
|
user = request.session.get("user")
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required"
|
|
)
|
|
|
|
user_roles = user.get("roles", [])
|
|
|
|
if not any(role in user_roles for role in required_roles):
|
|
logger.warning(
|
|
f"Access denied: User {user.get('username')} "
|
|
f"(roles: {user_roles}) tried to access resource requiring one of: {required_roles}"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Access denied. Required roles: {', '.join(required_roles)}"
|
|
)
|
|
|
|
return user
|
|
|
|
return check_roles
|
|
|
|
|
|
def is_admin(request: Request) -> bool:
|
|
"""
|
|
Check if user is admin
|
|
|
|
Args:
|
|
request: FastAPI request object
|
|
|
|
Returns:
|
|
True if user has admin role, False otherwise
|
|
"""
|
|
return has_role(request, Roles.ADMIN)
|