""" User and Role management endpoints (Admin only) Note: This manages local user records synced from Keycloak API endpoints (/api/v1/*) are NOT affected and continue using API Key authentication """ from typing import List from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from pydantic import BaseModel from datetime import datetime from app.db.session import get_db from app.models.user import User, Role from app.security.permissions import require_role, Roles import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/admin/users", tags=["admin-users"]) # Pydantic schemas class RoleSchema(BaseModel): id: int name: str description: str | None = None class Config: from_attributes = True class UserSchema(BaseModel): id: int keycloak_id: str username: str email: str | None = None full_name: str | None = None is_active: bool roles: List[RoleSchema] = [] last_login: datetime | None = None created_at: datetime class Config: from_attributes = True class UserCreateSchema(BaseModel): keycloak_id: str username: str email: str | None = None full_name: str | None = None class UserUpdateSchema(BaseModel): email: str | None = None full_name: str | None = None is_active: bool | None = None role_ids: List[int] | None = None @router.get("/", response_model=List[UserSchema]) async def list_users( db: Session = Depends(get_db), current_user: dict = Depends(require_role(Roles.ADMIN)) ): """List all users (Admin only)""" users = db.query(User).all() return users @router.get("/{user_id}", response_model=UserSchema) async def get_user( user_id: int, db: Session = Depends(get_db), current_user: dict = Depends(require_role(Roles.ADMIN)) ): """Get user by ID (Admin only)""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") return user @router.post("/", response_model=UserSchema) async def create_user( user_data: UserCreateSchema, db: Session = Depends(get_db), current_user: dict = Depends(require_role(Roles.ADMIN)) ): """Create new user record (Admin only)""" # Check if user already exists existing = db.query(User).filter(User.keycloak_id == user_data.keycloak_id).first() if existing: raise HTTPException(status_code=400, detail="User already exists") user = User(**user_data.dict()) db.add(user) db.commit() db.refresh(user) return user @router.put("/{user_id}", response_model=UserSchema) async def update_user( user_id: int, user_data: UserUpdateSchema, db: Session = Depends(get_db), current_user: dict = Depends(require_role(Roles.ADMIN)) ): """Update user (Admin only)""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") # Update fields if user_data.email is not None: user.email = user_data.email if user_data.full_name is not None: user.full_name = user_data.full_name if user_data.is_active is not None: user.is_active = user_data.is_active # Update roles if user_data.role_ids is not None: roles = db.query(Role).filter(Role.id.in_(user_data.role_ids)).all() user.roles = roles db.commit() db.refresh(user) return user @router.delete("/{user_id}") async def delete_user( user_id: int, db: Session = Depends(get_db), current_user: dict = Depends(require_role(Roles.ADMIN)) ): """Delete user (Admin only)""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") db.delete(user) db.commit() return {"message": "User deleted successfully"} @router.get("/roles/", response_model=List[RoleSchema]) async def list_roles( db: Session = Depends(get_db), current_user: dict = Depends(require_role(Roles.ADMIN)) ): """List all roles (Admin only)""" roles = db.query(Role).all() return roles