""" Keycloak authentication for web pages Note: This is ONLY for web UI authentication, NOT for API endpoints API endpoints use API Key authentication (see app/security/dependencies.py) """ from typing import Optional from fastapi import HTTPException, Request, status from keycloak import KeycloakOpenID from app.core.config import settings import logging logger = logging.getLogger(__name__) # Initialize Keycloak OpenID client def get_keycloak_client() -> KeycloakOpenID: """Get Keycloak OpenID client instance""" try: if settings.DEBUG_AUTH: logger.info("=" * 80) logger.info("KEYCLOAK CLIENT INITIALIZATION") logger.info(f"Server URL: {settings.KEYCLOAK_SERVER_URL}") logger.info(f"Realm: {settings.KEYCLOAK_REALM}") logger.info(f"Client ID: {settings.KEYCLOAK_CLIENT_ID}") logger.info(f"Client Secret: {'*' * len(settings.KEYCLOAK_CLIENT_SECRET) if settings.KEYCLOAK_CLIENT_SECRET else 'NOT SET'}") logger.info(f"Redirect URI: {settings.KEYCLOAK_REDIRECT_URI}") logger.info("=" * 80) return KeycloakOpenID( server_url=settings.KEYCLOAK_SERVER_URL, client_id=settings.KEYCLOAK_CLIENT_ID, realm_name=settings.KEYCLOAK_REALM, client_secret_key=settings.KEYCLOAK_CLIENT_SECRET, verify=True ) except Exception as e: logger.error(f"Failed to initialize Keycloak client: {e}") if settings.DEBUG_AUTH: import traceback logger.error(f"Traceback: {traceback.format_exc()}") raise def get_current_user(request: Request) -> Optional[dict]: """ Get current authenticated user from session Returns None if not authenticated """ return request.session.get("user") def require_user(request: Request) -> dict: """ Dependency to require authenticated user Raises 401 if not authenticated """ user = get_current_user(request) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) return user def get_login_url(redirect_to: str = "/") -> str: """ Generate Keycloak login URL Args: redirect_to: Path to redirect after successful login Returns: Keycloak authorization URL """ try: if settings.DEBUG_AUTH: logger.info("=" * 80) logger.info("GENERATING LOGIN URL") logger.info(f"Redirect to after login: {redirect_to}") logger.info(f"Keycloak redirect URI: {settings.KEYCLOAK_REDIRECT_URI}") keycloak_client = get_keycloak_client() auth_url = keycloak_client.auth_url( redirect_uri=settings.KEYCLOAK_REDIRECT_URI, state=redirect_to ) if settings.DEBUG_AUTH: logger.info(f"Generated auth URL: {auth_url}") logger.info("=" * 80) return auth_url except Exception as e: logger.error(f"Failed to generate login URL: {e}") if settings.DEBUG_AUTH: import traceback logger.error(f"Traceback: {traceback.format_exc()}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Authentication service unavailable" ) async def verify_token(token: str) -> dict: """ Verify and decode Keycloak access token Args: token: Access token from Keycloak Returns: User information from token """ try: keycloak_client = get_keycloak_client() userinfo = keycloak_client.userinfo(token) return userinfo except Exception as e: logger.error(f"Token verification failed: {e}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" ) def get_logout_url(redirect_uri: str = None) -> str: """ Generate Keycloak logout URL Args: redirect_uri: Where to redirect after logout Returns: Keycloak logout URL """ try: keycloak_client = get_keycloak_client() if redirect_uri is None: redirect_uri = f"{settings.ROOT_PATH}/" if settings.ROOT_PATH else "/" logout_url = keycloak_client.logout_url(redirect_uri=redirect_uri) return logout_url except Exception as e: logger.error(f"Failed to generate logout URL: {e}") # Return simple redirect if Keycloak logout fails return redirect_uri or "/"