Files
sriphat-dataplatform/03-apiservice-v0.1/app/admin.py
Gamegame101 9abd1f272c Update API service to use raw_waiting_time table
- Change RawOpdCheckpoint model to RawWaitingTime
- Update schema from FeedCheckpointIn to FeedWaitingTimeIn
- Switch to rawdata.raw_waiting_time table
- Keep existing /feed/checkpoint endpoint
- Add new fields: vn, txn, name, doctor_code, doctor_name, location_code, location_name, step_name, time
- Update permission to feed.waiting-time:write
2026-02-24 16:34:34 +07:00

108 lines
3.7 KiB
Python

from __future__ import annotations
from fastapi import HTTPException, Request, status
from sqladmin import Admin, ModelView
from sqladmin.authentication import AuthenticationBackend
from starlette.responses import RedirectResponse
from sqlalchemy.orm import sessionmaker
from wtforms import StringField
from wtforms.validators import Optional
from app.core.config import settings
from app.db.engine import engine
from app.db.models import ApiClient, ApiKey
from app.security.api_key import generate_api_key, get_prefix, hash_api_key
class AdminAuth(AuthenticationBackend):
async def login(self, request: Request) -> bool:
form = await request.form()
username = form.get("username")
password = form.get("password")
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
request.session.update({"admin": True})
return True
return False
async def logout(self, request: Request) -> bool:
request.session.clear()
return True
async def authenticate(self, request: Request) -> bool:
return bool(request.session.get("admin"))
class ApiClientAdmin(ModelView, model=ApiClient):
column_list = [ApiClient.id, ApiClient.name, ApiClient.is_active]
class ApiKeyAdmin(ModelView, model=ApiKey):
form_excluded_columns = [ApiKey.key_hash, ApiKey.key_prefix, ApiKey.created_at]
form_extra_fields = {
"plain_key": StringField("Plain Key", validators=[Optional()]),
"permissions_csv": StringField("Permissions (comma)", validators=[Optional()]),
}
async def on_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None:
plain_key = data.get("plain_key")
if plain_key:
model.key_prefix = get_prefix(plain_key)
model.key_hash = hash_api_key(plain_key)
permissions_csv = data.get("permissions_csv")
if permissions_csv is not None:
perms = [p.strip() for p in permissions_csv.split(",") if p.strip()]
model.permissions = perms
def mount_admin(app):
auth_backend = AdminAuth(secret_key=settings.ADMIN_SECRET_KEY)
admin = Admin(app=app, engine=engine, authentication_backend=auth_backend)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
admin.add_view(ApiClientAdmin)
admin.add_view(ApiKeyAdmin)
@app.get("/admin")
async def _admin_redirect(request: Request):
root_path = request.scope.get("root_path") or ""
return RedirectResponse(url=f"{root_path}/admin/")
@app.post("/admin/api-keys/generate")
async def _admin_generate_api_key(
request: Request,
client_id: int,
permissions: str = "",
name: str | None = None,
):
if not request.session.get("admin"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
perms = [p.strip() for p in permissions.split(",") if p.strip()]
plain_key = generate_api_key()
db = SessionLocal()
try:
client = db.get(ApiClient, client_id)
if not client:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found")
api_key = ApiKey(
client_id=client_id,
name=name,
key_prefix=get_prefix(plain_key),
key_hash=hash_api_key(plain_key),
permissions=perms,
is_active=True,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms}
finally:
db.close()