284 lines
10 KiB
Python
284 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from fastapi import HTTPException, Request, status
|
|
from fastapi.staticfiles import StaticFiles
|
|
from sqladmin import Admin, ModelView
|
|
from sqladmin.authentication import AuthenticationBackend
|
|
from starlette.responses import HTMLResponse, RedirectResponse
|
|
from starlette.datastructures import URL
|
|
from sqlalchemy.orm import sessionmaker
|
|
from wtforms import BooleanField, SelectField, StringField
|
|
from wtforms.validators import Optional
|
|
|
|
from app.core.config import settings
|
|
from app.db.engine import engine
|
|
from app.db.models import ApiClient, ApiKey
|
|
from app.security.api_key import generate_api_key, get_prefix, hash_api_key
|
|
|
|
|
|
class AdminAuth(AuthenticationBackend):
|
|
async def login(self, request: Request) -> bool:
|
|
form = await request.form()
|
|
username = form.get("username")
|
|
password = form.get("password")
|
|
|
|
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
|
|
request.session.update({"admin": True})
|
|
return True
|
|
return False
|
|
|
|
async def logout(self, request: Request) -> bool:
|
|
request.session.clear()
|
|
return True
|
|
|
|
async def authenticate(self, request: Request) -> bool:
|
|
return bool(request.session.get("admin"))
|
|
|
|
|
|
class ApiClientAdmin(ModelView, model=ApiClient):
|
|
column_list = [ApiClient.id, ApiClient.name, ApiClient.is_active]
|
|
|
|
async def insert_model(self, request: Request, data: dict) -> ApiClient:
|
|
obj: ApiClient = await super().insert_model(request, data)
|
|
|
|
plain_key = generate_api_key()
|
|
|
|
db = sessionmaker(bind=engine, autoflush=False, autocommit=False)()
|
|
try:
|
|
api_key = ApiKey(
|
|
client_id=obj.id,
|
|
name="auto",
|
|
key_prefix=get_prefix(plain_key),
|
|
key_hash=hash_api_key(plain_key),
|
|
permissions=[],
|
|
is_active=True,
|
|
)
|
|
db.add(api_key)
|
|
db.commit()
|
|
db.refresh(api_key)
|
|
|
|
request.session["generated_api_key"] = {
|
|
"client_id": obj.id,
|
|
"client_name": obj.name,
|
|
"key_id": api_key.id,
|
|
"api_key": plain_key,
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
return obj
|
|
|
|
|
|
class ApiKeyAdmin(ModelView, model=ApiKey):
|
|
column_list = [ApiKey.id, ApiKey.client_id, ApiKey.name, ApiKey.is_active, ApiKey.permissions]
|
|
form_excluded_columns = [ApiKey.key_hash, ApiKey.key_prefix, ApiKey.created_at]
|
|
|
|
form_extra_fields = {
|
|
"plain_key": StringField("Plain Key", validators=[Optional()]),
|
|
"permissions_csv": StringField("Permissions (comma)", validators=[Optional()]),
|
|
"endpoint_path": SelectField("Endpoint", choices=[], validators=[Optional()]),
|
|
"perm_read": BooleanField("Read (GET)"),
|
|
"perm_write": BooleanField("Write (POST/PATCH)"),
|
|
"perm_delete": BooleanField("Delete (DELETE)"),
|
|
}
|
|
|
|
async def on_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None:
|
|
plain_key = data.get("plain_key")
|
|
if not plain_key and is_created:
|
|
plain_key = generate_api_key()
|
|
|
|
if plain_key:
|
|
model.key_prefix = get_prefix(plain_key)
|
|
model.key_hash = hash_api_key(plain_key)
|
|
|
|
if is_created:
|
|
request.state.generated_api_key_plain = plain_key
|
|
|
|
permissions: list[str] = []
|
|
endpoint_path = data.get("endpoint_path")
|
|
if endpoint_path:
|
|
if data.get("perm_read"):
|
|
permissions.append(f"{endpoint_path}:read")
|
|
if data.get("perm_write"):
|
|
permissions.append(f"{endpoint_path}:write")
|
|
if data.get("perm_delete"):
|
|
permissions.append(f"{endpoint_path}:delete")
|
|
|
|
permissions_csv = data.get("permissions_csv")
|
|
if permissions_csv is not None:
|
|
perms = [p.strip() for p in permissions_csv.split(",") if p.strip()]
|
|
permissions.extend(perms)
|
|
|
|
if permissions:
|
|
seen: set[str] = set()
|
|
deduped: list[str] = []
|
|
for p in permissions:
|
|
if p not in seen:
|
|
seen.add(p)
|
|
deduped.append(p)
|
|
model.permissions = deduped
|
|
|
|
async def after_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None:
|
|
if not is_created:
|
|
return
|
|
|
|
plain_key = getattr(request.state, "generated_api_key_plain", None)
|
|
if not plain_key:
|
|
return
|
|
|
|
request.session["generated_api_key"] = {
|
|
"client_id": model.client_id,
|
|
"client_name": str(getattr(model, "client", "")) if getattr(model, "client", None) else "",
|
|
"key_id": model.id,
|
|
"api_key": plain_key,
|
|
}
|
|
|
|
|
|
def mount_admin(app):
|
|
auth_backend = AdminAuth(secret_key=settings.ADMIN_SECRET_KEY)
|
|
|
|
class CustomAdmin(Admin):
|
|
def get_save_redirect_url(
|
|
self, request: Request, form, model_view: ModelView, obj
|
|
):
|
|
if (
|
|
getattr(model_view, "model", None) in (ApiClient, ApiKey)
|
|
and request.session.get("generated_api_key")
|
|
):
|
|
root_path = request.scope.get("root_path") or ""
|
|
return URL(f"{root_path}/admin/generated-api-key")
|
|
|
|
return super().get_save_redirect_url(
|
|
request=request,
|
|
form=form,
|
|
model_view=model_view,
|
|
obj=obj,
|
|
)
|
|
|
|
admin = CustomAdmin(
|
|
app=app,
|
|
engine=engine,
|
|
authentication_backend=auth_backend,
|
|
title="My Service Management",
|
|
base_url="/admin",
|
|
)
|
|
|
|
openapi = app.openapi()
|
|
paths = openapi.get("paths") or {}
|
|
endpoint_choices: list[tuple[str, str]] = []
|
|
for path in sorted(paths.keys()):
|
|
if not path.startswith("/api/"):
|
|
continue
|
|
methods = paths.get(path) or {}
|
|
available = sorted([m.upper() for m in methods.keys()])
|
|
label = f"{path} [{' '.join(available)}]" if available else path
|
|
endpoint_choices.append((path, label))
|
|
ApiKeyAdmin.form_extra_fields["endpoint_path"].kwargs["choices"] = endpoint_choices
|
|
|
|
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
|
|
|
admin.add_view(ApiClientAdmin)
|
|
admin.add_view(ApiKeyAdmin)
|
|
|
|
@app.get("/admin/generated-api-key")
|
|
async def _admin_generated_api_key(request: Request):
|
|
if not request.session.get("admin"):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
|
|
|
|
key_info = request.session.pop("generated_api_key", None)
|
|
root_path = request.scope.get("root_path") or ""
|
|
clients_url = f"{root_path}/admin/{ApiClientAdmin.identity}/list"
|
|
|
|
if not key_info:
|
|
return HTMLResponse(
|
|
f"<h2>No API key to display</h2><p>The API key was already shown or expired.</p><p><a href=\"{clients_url}\">Back to clients</a></p>",
|
|
status_code=200,
|
|
)
|
|
|
|
client_name = key_info.get("client_name", "")
|
|
client_id = key_info.get("client_id", "")
|
|
key_id = key_info.get("key_id", "")
|
|
api_key = key_info.get("api_key", "")
|
|
|
|
return HTMLResponse(
|
|
(
|
|
"<h2>API key generated</h2>"
|
|
"<p>Copy this API key now. You won't be able to view it again.</p>"
|
|
f"<p><b>Client</b>: {client_name} (ID: {client_id})</p>"
|
|
f"<p><b>Key ID</b>: {key_id}</p>"
|
|
f"<pre style=\"padding:12px;border:1px solid #ddd;background:#f7f7f7;\">{api_key}</pre>"
|
|
f"<p><a href=\"{clients_url}\">Back to clients</a></p>"
|
|
),
|
|
status_code=200,
|
|
)
|
|
|
|
@app.get("/admin/clients/{client_id}/generate-api-key")
|
|
async def _admin_generate_api_key_get(
|
|
request: Request,
|
|
client_id: int,
|
|
permissions: str = "",
|
|
name: str | None = None,
|
|
):
|
|
if not request.session.get("admin"):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
|
|
|
|
perms = [p.strip() for p in permissions.split(",") if p.strip()]
|
|
plain_key = generate_api_key()
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
client = db.get(ApiClient, client_id)
|
|
if not client:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found")
|
|
|
|
api_key = ApiKey(
|
|
client_id=client_id,
|
|
name=name,
|
|
key_prefix=get_prefix(plain_key),
|
|
key_hash=hash_api_key(plain_key),
|
|
permissions=perms,
|
|
is_active=True,
|
|
)
|
|
db.add(api_key)
|
|
db.commit()
|
|
db.refresh(api_key)
|
|
|
|
return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms}
|
|
finally:
|
|
db.close()
|
|
|
|
@app.post("/admin/api-keys/generate")
|
|
async def _admin_generate_api_key(
|
|
request: Request,
|
|
client_id: int,
|
|
permissions: str = "",
|
|
name: str | None = None,
|
|
):
|
|
if not request.session.get("admin"):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
|
|
|
|
perms = [p.strip() for p in permissions.split(",") if p.strip()]
|
|
plain_key = generate_api_key()
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
client = db.get(ApiClient, client_id)
|
|
if not client:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found")
|
|
|
|
api_key = ApiKey(
|
|
client_id=client_id,
|
|
name=name,
|
|
key_prefix=get_prefix(plain_key),
|
|
key_hash=hash_api_key(plain_key),
|
|
permissions=perms,
|
|
is_active=True,
|
|
)
|
|
db.add(api_key)
|
|
db.commit()
|
|
db.refresh(api_key)
|
|
|
|
return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms}
|
|
finally:
|
|
db.close()
|