Files
sriphat-dataplatform/03-apiservice/app/admin.py
2026-02-24 23:29:20 +07:00

205 lines
7.7 KiB
Python

from __future__ import annotations
from fastapi import HTTPException, Request, status
from sqladmin import Admin, ModelView
from sqladmin.authentication import AuthenticationBackend
from starlette.responses import RedirectResponse, JSONResponse
from sqlalchemy.orm import sessionmaker
from wtforms import StringField, TextAreaField
from wtforms.validators import Optional
from app.core.config import settings
from app.db.engine import engine
from app.db.models import ApiClient, ApiKey
from app.security.api_key import generate_api_key, get_prefix, hash_api_key
class AdminAuth(AuthenticationBackend):
async def login(self, request: Request) -> bool:
form = await request.form()
username = form.get("username")
password = form.get("password")
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
request.session.update({"admin": True})
return True
return False
async def logout(self, request: Request) -> bool:
request.session.clear()
return True
async def authenticate(self, request: Request) -> bool:
return bool(request.session.get("admin"))
class ApiClientAdmin(ModelView, model=ApiClient):
column_list = [ApiClient.id, ApiClient.name, ApiClient.is_active]
class ApiKeyAdmin(ModelView, model=ApiKey):
form_excluded_columns = [ApiKey.key_hash, ApiKey.key_prefix, ApiKey.created_at]
column_list = [ApiKey.id, ApiKey.client_id, ApiKey.name, ApiKey.key_prefix, ApiKey.is_active, ApiKey.created_at]
column_details_list = [ApiKey.id, ApiKey.client_id, ApiKey.name, ApiKey.key_prefix, ApiKey.permissions, ApiKey.is_active, ApiKey.created_at]
details_template = "apikey_details.html"
form_extra_fields = {
"permissions": TextAreaField(
"Permissions (JSON Array)",
validators=[Optional()],
description='Example: ["feed.waiting-time:write", "feed.opd-checkpoint:write"]',
render_kw={"placeholder": '["feed.waiting-time:write"]', "rows": 3}
),
}
async def on_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None:
import json
# Handle permissions from textarea (JSON format)
permissions_str = data.get("permissions")
if permissions_str:
try:
perms = json.loads(permissions_str)
if isinstance(perms, list):
model.permissions = perms
else:
raise ValueError("Permissions must be a JSON array")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid permissions format: {str(e)}. Use JSON array like: [\"feed.waiting-time:write\"]"
)
# Auto-generate key for new records if not provided
if is_created and not model.key_hash:
plain_key = generate_api_key()
model.key_prefix = get_prefix(plain_key)
model.key_hash = hash_api_key(plain_key)
# Store in session for display after creation
request.session[f"new_api_key_{model.client_id}"] = plain_key
def mount_admin(app):
import os
auth_backend = AdminAuth(secret_key=settings.ADMIN_SECRET_KEY)
# Configure templates directory
templates_dir = os.path.join(os.path.dirname(__file__), "templates")
admin = Admin(
app=app,
engine=engine,
authentication_backend=auth_backend,
templates_dir=templates_dir
)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
admin.add_view(ApiClientAdmin)
admin.add_view(ApiKeyAdmin)
@app.get("/admin")
async def _admin_redirect(request: Request):
root_path = request.scope.get("root_path") or ""
return RedirectResponse(url=f"{root_path}/admin/")
@app.post("/admin/api-keys/generate")
async def _admin_generate_api_key(
request: Request,
client_id: int,
permissions: str = "",
name: str | None = None,
):
if not request.session.get("admin"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
perms = [p.strip() for p in permissions.split(",") if p.strip()]
plain_key = generate_api_key()
db = SessionLocal()
try:
client = db.get(ApiClient, client_id)
if not client:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found")
api_key = ApiKey(
client_id=client_id,
name=name,
key_prefix=get_prefix(plain_key),
key_hash=hash_api_key(plain_key),
permissions=perms,
is_active=True,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms}
finally:
db.close()
@app.post("/admin/api-keys/{key_id}/regenerate")
async def _admin_regenerate_api_key(request: Request, key_id: int):
"""Regenerate API key - creates new key while preserving permissions and other settings."""
if not request.session.get("admin"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
db = SessionLocal()
try:
api_key = db.get(ApiKey, key_id)
if not api_key:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="API Key not found")
# Generate new key
plain_key = generate_api_key()
api_key.key_prefix = get_prefix(plain_key)
api_key.key_hash = hash_api_key(plain_key)
db.commit()
db.refresh(api_key)
return JSONResponse({
"success": True,
"key_id": api_key.id,
"api_key": plain_key,
"key_prefix": api_key.key_prefix,
"permissions": api_key.permissions,
"message": "API key regenerated successfully. Please save this key - it won't be shown again!"
})
finally:
db.close()
@app.get("/admin/api-keys/{key_id}/view")
async def _admin_view_api_key(request: Request, key_id: int):
"""View API key from session (only works for newly created/regenerated keys)."""
if not request.session.get("admin"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
db = SessionLocal()
try:
api_key = db.get(ApiKey, key_id)
if not api_key:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="API Key not found")
# Check if there's a stored key in session
session_key = f"new_api_key_{api_key.client_id}"
plain_key = request.session.get(session_key)
if plain_key:
# Clear from session after viewing
request.session.pop(session_key, None)
return JSONResponse({
"success": True,
"api_key": plain_key,
"key_prefix": api_key.key_prefix,
"message": "This is the only time you can view this key!"
})
else:
return JSONResponse({
"success": False,
"message": "API key cannot be retrieved. Keys can only be viewed once after creation or regeneration."
}, status_code=400)
finally:
db.close()