from __future__ import annotations from fastapi import HTTPException, Request, status from sqladmin import Admin, ModelView from sqladmin.authentication import AuthenticationBackend from starlette.responses import RedirectResponse from sqlalchemy.orm import sessionmaker from wtforms import StringField from wtforms.validators import Optional from app.core.config import settings from app.db.engine import engine from app.db.models import ApiClient, ApiKey from app.security.api_key import generate_api_key, get_prefix, hash_api_key class AdminAuth(AuthenticationBackend): async def login(self, request: Request) -> bool: form = await request.form() username = form.get("username") password = form.get("password") if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD: request.session.update({"admin": True}) return True return False async def logout(self, request: Request) -> bool: request.session.clear() return True async def authenticate(self, request: Request) -> bool: return bool(request.session.get("admin")) class ApiClientAdmin(ModelView, model=ApiClient): column_list = [ApiClient.id, ApiClient.name, ApiClient.is_active] class ApiKeyAdmin(ModelView, model=ApiKey): form_excluded_columns = [ApiKey.key_hash, ApiKey.key_prefix, ApiKey.created_at] form_extra_fields = { "plain_key": StringField("Plain Key", validators=[Optional()]), "permissions_csv": StringField("Permissions (comma)", validators=[Optional()]), } async def on_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None: plain_key = data.get("plain_key") if plain_key: model.key_prefix = get_prefix(plain_key) model.key_hash = hash_api_key(plain_key) permissions_csv = data.get("permissions_csv") if permissions_csv is not None: perms = [p.strip() for p in permissions_csv.split(",") if p.strip()] model.permissions = perms def mount_admin(app): auth_backend = AdminAuth(secret_key=settings.ADMIN_SECRET_KEY) admin = Admin(app=app, engine=engine, authentication_backend=auth_backend) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) admin.add_view(ApiClientAdmin) admin.add_view(ApiKeyAdmin) @app.get("/admin") async def _admin_redirect(request: Request): root_path = request.scope.get("root_path") or "" return RedirectResponse(url=f"{root_path}/admin/") @app.post("/admin/api-keys/generate") async def _admin_generate_api_key( request: Request, client_id: int, permissions: str = "", name: str | None = None, ): if not request.session.get("admin"): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") perms = [p.strip() for p in permissions.split(",") if p.strip()] plain_key = generate_api_key() db = SessionLocal() try: client = db.get(ApiClient, client_id) if not client: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found") api_key = ApiKey( client_id=client_id, name=name, key_prefix=get_prefix(plain_key), key_hash=hash_api_key(plain_key), permissions=perms, is_active=True, ) db.add(api_key) db.commit() db.refresh(api_key) return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms} finally: db.close()