add init new files
This commit is contained in:
107
app/admin.py
Normal file
107
app/admin.py
Normal file
@@ -0,0 +1,107 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import HTTPException, Request, status
|
||||
from sqladmin import Admin, ModelView
|
||||
from sqladmin.authentication import AuthenticationBackend
|
||||
from starlette.responses import RedirectResponse
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from wtforms import StringField
|
||||
from wtforms.validators import Optional
|
||||
|
||||
from app.core.config import settings
|
||||
from app.db.engine import engine
|
||||
from app.db.models import ApiClient, ApiKey
|
||||
from app.security.api_key import generate_api_key, get_prefix, hash_api_key
|
||||
|
||||
|
||||
class AdminAuth(AuthenticationBackend):
|
||||
async def login(self, request: Request) -> bool:
|
||||
form = await request.form()
|
||||
username = form.get("username")
|
||||
password = form.get("password")
|
||||
|
||||
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
|
||||
request.session.update({"admin": True})
|
||||
return True
|
||||
return False
|
||||
|
||||
async def logout(self, request: Request) -> bool:
|
||||
request.session.clear()
|
||||
return True
|
||||
|
||||
async def authenticate(self, request: Request) -> bool:
|
||||
return bool(request.session.get("admin"))
|
||||
|
||||
|
||||
class ApiClientAdmin(ModelView, model=ApiClient):
|
||||
column_list = [ApiClient.id, ApiClient.name, ApiClient.is_active]
|
||||
|
||||
|
||||
class ApiKeyAdmin(ModelView, model=ApiKey):
|
||||
form_excluded_columns = [ApiKey.key_hash, ApiKey.key_prefix, ApiKey.created_at]
|
||||
|
||||
form_extra_fields = {
|
||||
"plain_key": StringField("Plain Key", validators=[Optional()]),
|
||||
"permissions_csv": StringField("Permissions (comma)", validators=[Optional()]),
|
||||
}
|
||||
|
||||
async def on_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None:
|
||||
plain_key = data.get("plain_key")
|
||||
if plain_key:
|
||||
model.key_prefix = get_prefix(plain_key)
|
||||
model.key_hash = hash_api_key(plain_key)
|
||||
|
||||
permissions_csv = data.get("permissions_csv")
|
||||
if permissions_csv is not None:
|
||||
perms = [p.strip() for p in permissions_csv.split(",") if p.strip()]
|
||||
model.permissions = perms
|
||||
|
||||
|
||||
def mount_admin(app):
|
||||
auth_backend = AdminAuth(secret_key=settings.ADMIN_SECRET_KEY)
|
||||
admin = Admin(app=app, engine=engine, authentication_backend=auth_backend)
|
||||
|
||||
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
||||
|
||||
admin.add_view(ApiClientAdmin)
|
||||
admin.add_view(ApiKeyAdmin)
|
||||
|
||||
@app.get("/admin")
|
||||
async def _admin_redirect(request: Request):
|
||||
root_path = request.scope.get("root_path") or ""
|
||||
return RedirectResponse(url=f"{root_path}/admin/")
|
||||
|
||||
@app.post("/admin/api-keys/generate")
|
||||
async def _admin_generate_api_key(
|
||||
request: Request,
|
||||
client_id: int,
|
||||
permissions: str = "",
|
||||
name: str | None = None,
|
||||
):
|
||||
if not request.session.get("admin"):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
|
||||
|
||||
perms = [p.strip() for p in permissions.split(",") if p.strip()]
|
||||
plain_key = generate_api_key()
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
client = db.get(ApiClient, client_id)
|
||||
if not client:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found")
|
||||
|
||||
api_key = ApiKey(
|
||||
client_id=client_id,
|
||||
name=name,
|
||||
key_prefix=get_prefix(plain_key),
|
||||
key_hash=hash_api_key(plain_key),
|
||||
permissions=perms,
|
||||
is_active=True,
|
||||
)
|
||||
db.add(api_key)
|
||||
db.commit()
|
||||
db.refresh(api_key)
|
||||
|
||||
return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms}
|
||||
finally:
|
||||
db.close()
|
||||
Reference in New Issue
Block a user