from __future__ import annotations from fastapi import HTTPException, Request, status from fastapi.staticfiles import StaticFiles from sqladmin import Admin, ModelView from sqladmin.authentication import AuthenticationBackend from starlette.responses import HTMLResponse, RedirectResponse from starlette.datastructures import URL from sqlalchemy.orm import sessionmaker from wtforms import BooleanField, SelectField, StringField from wtforms.validators import Optional from app.core.config import settings from app.db.engine import engine from app.db.models import ApiClient, ApiKey from app.security.api_key import generate_api_key, get_prefix, hash_api_key class AdminAuth(AuthenticationBackend): async def login(self, request: Request) -> bool: form = await request.form() username = form.get("username") password = form.get("password") if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD: request.session.update({"admin": True}) return True return False async def logout(self, request: Request) -> bool: request.session.clear() return True async def authenticate(self, request: Request) -> bool: return bool(request.session.get("admin")) class ApiClientAdmin(ModelView, model=ApiClient): column_list = [ApiClient.id, ApiClient.name, ApiClient.is_active] async def insert_model(self, request: Request, data: dict) -> ApiClient: obj: ApiClient = await super().insert_model(request, data) plain_key = generate_api_key() db = sessionmaker(bind=engine, autoflush=False, autocommit=False)() try: api_key = ApiKey( client_id=obj.id, name="auto", key_prefix=get_prefix(plain_key), key_hash=hash_api_key(plain_key), permissions=[], is_active=True, ) db.add(api_key) db.commit() db.refresh(api_key) request.session["generated_api_key"] = { "client_id": obj.id, "client_name": obj.name, "key_id": api_key.id, "api_key": plain_key, } finally: db.close() return obj class ApiKeyAdmin(ModelView, model=ApiKey): column_list = [ApiKey.id, ApiKey.client_id, ApiKey.name, ApiKey.is_active, ApiKey.permissions] form_excluded_columns = [ApiKey.key_hash, ApiKey.key_prefix, ApiKey.created_at] form_extra_fields = { "plain_key": StringField("Plain Key", validators=[Optional()]), "permissions_csv": StringField("Permissions (comma)", validators=[Optional()]), "endpoint_path": SelectField("Endpoint", choices=[], validators=[Optional()]), "perm_read": BooleanField("Read (GET)"), "perm_write": BooleanField("Write (POST/PATCH)"), "perm_delete": BooleanField("Delete (DELETE)"), } async def on_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None: plain_key = data.get("plain_key") if not plain_key and is_created: plain_key = generate_api_key() if plain_key: model.key_prefix = get_prefix(plain_key) model.key_hash = hash_api_key(plain_key) if is_created: request.state.generated_api_key_plain = plain_key permissions: list[str] = [] endpoint_path = data.get("endpoint_path") if endpoint_path: if data.get("perm_read"): permissions.append(f"{endpoint_path}:read") if data.get("perm_write"): permissions.append(f"{endpoint_path}:write") if data.get("perm_delete"): permissions.append(f"{endpoint_path}:delete") permissions_csv = data.get("permissions_csv") if permissions_csv is not None: perms = [p.strip() for p in permissions_csv.split(",") if p.strip()] permissions.extend(perms) if permissions: seen: set[str] = set() deduped: list[str] = [] for p in permissions: if p not in seen: seen.add(p) deduped.append(p) model.permissions = deduped async def after_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None: if not is_created: return plain_key = getattr(request.state, "generated_api_key_plain", None) if not plain_key: return request.session["generated_api_key"] = { "client_id": model.client_id, "client_name": str(getattr(model, "client", "")) if getattr(model, "client", None) else "", "key_id": model.id, "api_key": plain_key, } def mount_admin(app): auth_backend = AdminAuth(secret_key=settings.ADMIN_SECRET_KEY) class CustomAdmin(Admin): def get_save_redirect_url( self, request: Request, form, model_view: ModelView, obj ): if ( getattr(model_view, "model", None) in (ApiClient, ApiKey) and request.session.get("generated_api_key") ): root_path = request.scope.get("root_path") or "" return URL(f"{root_path}/admin/generated-api-key") return super().get_save_redirect_url( request=request, form=form, model_view=model_view, obj=obj, ) admin = CustomAdmin( app=app, engine=engine, authentication_backend=auth_backend, title="My Service Management", base_url="/admin", ) openapi = app.openapi() paths = openapi.get("paths") or {} endpoint_choices: list[tuple[str, str]] = [] for path in sorted(paths.keys()): if not path.startswith("/api/"): continue methods = paths.get(path) or {} available = sorted([m.upper() for m in methods.keys()]) label = f"{path} [{' '.join(available)}]" if available else path endpoint_choices.append((path, label)) ApiKeyAdmin.form_extra_fields["endpoint_path"].kwargs["choices"] = endpoint_choices SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) admin.add_view(ApiClientAdmin) admin.add_view(ApiKeyAdmin) @app.get("/admin/generated-api-key") async def _admin_generated_api_key(request: Request): if not request.session.get("admin"): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") key_info = request.session.pop("generated_api_key", None) root_path = request.scope.get("root_path") or "" clients_url = f"{root_path}/admin/{ApiClientAdmin.identity}/list" if not key_info: return HTMLResponse( f"
The API key was already shown or expired.
", status_code=200, ) client_name = key_info.get("client_name", "") client_id = key_info.get("client_id", "") key_id = key_info.get("key_id", "") api_key = key_info.get("api_key", "") return HTMLResponse( ( "Copy this API key now. You won't be able to view it again.
" f"Client: {client_name} (ID: {client_id})
" f"Key ID: {key_id}
" f"{api_key}"
f""
),
status_code=200,
)
@app.get("/admin/clients/{client_id}/generate-api-key")
async def _admin_generate_api_key_get(
request: Request,
client_id: int,
permissions: str = "",
name: str | None = None,
):
if not request.session.get("admin"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
perms = [p.strip() for p in permissions.split(",") if p.strip()]
plain_key = generate_api_key()
db = SessionLocal()
try:
client = db.get(ApiClient, client_id)
if not client:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found")
api_key = ApiKey(
client_id=client_id,
name=name,
key_prefix=get_prefix(plain_key),
key_hash=hash_api_key(plain_key),
permissions=perms,
is_active=True,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms}
finally:
db.close()
@app.post("/admin/api-keys/generate")
async def _admin_generate_api_key(
request: Request,
client_id: int,
permissions: str = "",
name: str | None = None,
):
if not request.session.get("admin"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
perms = [p.strip() for p in permissions.split(",") if p.strip()]
plain_key = generate_api_key()
db = SessionLocal()
try:
client = db.get(ApiClient, client_id)
if not client:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found")
api_key = ApiKey(
client_id=client_id,
name=name,
key_prefix=get_prefix(plain_key),
key_hash=hash_api_key(plain_key),
permissions=perms,
is_active=True,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms}
finally:
db.close()