""" Authentication routes for Keycloak web login Note: These routes are ONLY for web UI authentication API endpoints use API Key authentication separately """ from fastapi import APIRouter, Request, HTTPException, Query from fastapi.responses import RedirectResponse from app.security.keycloak_auth import ( get_keycloak_client, get_login_url, get_logout_url, get_current_user ) from app.core.config import settings import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth", tags=["authentication"]) @router.get("/login") async def login(request: Request, redirect_to: str = Query(default="/")): """ Redirect to Keycloak login page Args: redirect_to: Path to redirect after successful login """ # Check if already logged in user = get_current_user(request) if user: return RedirectResponse(url=redirect_to) # Generate Keycloak login URL login_url = get_login_url(redirect_to) return RedirectResponse(url=login_url) @router.get("/callback") async def auth_callback( request: Request, code: str = Query(...), state: str = Query(default="/") ): """ Handle Keycloak callback after login Args: code: Authorization code from Keycloak state: Original redirect path """ try: if settings.DEBUG_AUTH: logger.info("=" * 80) logger.info("AUTHENTICATION CALLBACK RECEIVED") logger.info(f"Authorization code: {code[:20]}...{code[-20:] if len(code) > 40 else code}") logger.info(f"State (redirect_to): {state}") logger.info(f"Request URL: {request.url}") logger.info(f"Request headers: {dict(request.headers)}") keycloak_client = get_keycloak_client() if settings.DEBUG_AUTH: logger.info("-" * 80) logger.info("EXCHANGING CODE FOR TOKENS") logger.info(f"Grant type: authorization_code") logger.info(f"Code: {code[:20]}...") logger.info(f"Redirect URI: {settings.KEYCLOAK_REDIRECT_URI}") # Exchange authorization code for tokens token_response = keycloak_client.token( grant_type="authorization_code", code=code, redirect_uri=settings.KEYCLOAK_REDIRECT_URI ) if settings.DEBUG_AUTH: logger.info("TOKEN RESPONSE RECEIVED") logger.info(f"Access token: {'*' * 20}...{token_response.get('access_token', '')[-20:] if token_response.get('access_token') else 'NONE'}") logger.info(f"Refresh token: {'Present' if token_response.get('refresh_token') else 'None'}") logger.info(f"Token type: {token_response.get('token_type', 'N/A')}") logger.info(f"Expires in: {token_response.get('expires_in', 'N/A')} seconds") # Get user information from token access_token = token_response.get("access_token") if not access_token: logger.error("No access token in response!") if settings.DEBUG_AUTH: logger.error(f"Full token response: {token_response}") raise HTTPException( status_code=400, detail="No access token received from Keycloak" ) if settings.DEBUG_AUTH: logger.info("-" * 80) logger.info("FETCHING USER INFO") from jose import jwt userinfo = jwt.decode(access_token, key="", options={"verify_signature": False, "verify_aud": False}) logger.info(f"Decoded access_token: {userinfo.get('preferred_username')}") # # 2. ดึง id_token ออกมา (ตัวนี้คือหัวใจของ OIDC) # id_token = token_response.get("id_token") # if not id_token: # logger.error("No id_token in response!") # if settings.DEBUG_AUTH: # logger.error(f"Full token response: {token_response}") # raise HTTPException( # status_code=400, # detail="No id_token received from Keycloak" # ) # # 3. Decode id_token เพื่อเอาข้อมูล User (ไม่ต้องใช้ Key เพราะเราเชื่อถือ Connection นี้) # userinfo = jwt.decode( # id_token, # key="", # options={"verify_signature": False, "verify_aud": False} # ) #userinfo = keycloak_client.userinfo(access_token) # Extract roles from token roles = [] # 1. Realm roles (roles ระดับ realm) if "realm_access" in userinfo and "roles" in userinfo["realm_access"]: roles.extend(userinfo["realm_access"]["roles"]) # 2. Client roles (roles เฉพาะ client apiservice) if "resource_access" in userinfo and settings.KEYCLOAK_CLIENT_ID in userinfo["resource_access"]: client_roles = userinfo["resource_access"][settings.KEYCLOAK_CLIENT_ID].get("roles", []) roles.extend(client_roles) # Filter to only include our application roles user_roles = [r for r in roles if r in ["admin", "operation"]] if settings.DEBUG_AUTH: logger.info("USER INFO RECEIVED") logger.info(f"Username: {userinfo.get('preferred_username')}") logger.info(f"Email: {userinfo.get('email')}") logger.info(f"Name: {userinfo.get('name')}") logger.info(f"Sub (User ID): {userinfo.get('sub')}") logger.info(f"All roles from token: {roles}") logger.info(f"Filtered user roles: {user_roles}") logger.info(f"Full userinfo keys: {list(userinfo.keys())}") # Store user info, roles, and tokens in session user_session_data = { "username": userinfo.get("preferred_username"), "email": userinfo.get("email"), "name": userinfo.get("name", userinfo.get("preferred_username")), "sub": userinfo.get("sub"), # User ID "roles": user_roles, # User roles "access_token": access_token, "refresh_token": token_response.get("refresh_token") } request.session["user"] = user_session_data if settings.DEBUG_AUTH: logger.info("-" * 80) logger.info("SESSION UPDATED") logger.info(f"Session user data: {dict((k, v) for k, v in user_session_data.items() if k not in ['access_token', 'refresh_token'])}") logger.info(f"User {userinfo.get('preferred_username')} logged in successfully") # Redirect to original destination redirect_url = state if state else "/" # Ensure redirect URL starts with root_path if set if settings.ROOT_PATH and not redirect_url.startswith(settings.ROOT_PATH): redirect_url = f"{settings.ROOT_PATH}{redirect_url}" if settings.DEBUG_AUTH: logger.info(f"Redirecting to: {redirect_url}") logger.info("=" * 80) return RedirectResponse(url=redirect_url, status_code=302) except Exception as e: logger.error(f"Authentication callback failed: {e}") if settings.DEBUG_AUTH: import traceback logger.error("FULL TRACEBACK:") logger.error(traceback.format_exc()) logger.error("=" * 80) raise HTTPException( status_code=400, detail=f"Authentication failed: {str(e)}" ) @router.get("/logout") async def logout(request: Request): """ Logout user and clear session Redirects to Keycloak logout page """ user = get_current_user(request) # Clear session request.session.clear() if user: logger.info(f"User {user.get('username')} logged out") # Get Keycloak logout URL redirect_uri = f"{settings.ROOT_PATH}/" if settings.ROOT_PATH else "/" logout_url = get_logout_url(redirect_uri) return RedirectResponse(url=logout_url) @router.get("/user") async def get_user_info(request: Request): """ Get current authenticated user information Returns 401 if not authenticated """ user = get_current_user(request) if not user: raise HTTPException( status_code=401, detail="Not authenticated" ) # Return user info without sensitive tokens return { "username": user.get("username"), "email": user.get("email"), "name": user.get("name"), "sub": user.get("sub") } @router.get("/status") async def auth_status(request: Request): """ Check authentication status Returns whether user is logged in """ user = get_current_user(request) return { "authenticated": user is not None, "user": { "username": user.get("username"), "name": user.get("name") } if user else None }