- Disable SQLAdmin basic auth (comment out mount_admin, statics, redirect) - Add /api-management page (Keycloak admin role required) - Add admin_api_keys.py: REST endpoints for list/create clients and keys - Add api_management.html: manage API clients, keys, permissions with copy-once key display - Update index.html: API Management link -> /api-management - Update auth middleware: add /api-management and /admin/users to PROTECTED_PATHS - Add CHANGES-2026-06-04.md dev notes
331 lines
11 KiB
Python
331 lines
11 KiB
Python
"""
|
|
Web page routes for the application
|
|
"""
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
from fastapi import APIRouter, Request, UploadFile, File, Form, HTTPException, Depends
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
import asyncio
|
|
import logging
|
|
|
|
from app.core.config import settings
|
|
from app.security.permissions import require_role, Roles
|
|
from app.db.session import get_db
|
|
from app.models.upload import UploadHistory
|
|
from app.services.airflow_client import airflow_client
|
|
from app.services import minio_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
# Setup templates
|
|
templates_dir = Path(__file__).parent.parent / "templates"
|
|
templates = Jinja2Templates(directory=str(templates_dir))
|
|
|
|
# Local fallback directory (used only if MinIO is not configured)
|
|
UPLOAD_DIR = Path("/data/uploads")
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
class UploadRecordSchema(BaseModel):
|
|
id: int
|
|
upload_id: str
|
|
filename: str
|
|
filepath: str
|
|
description: str | None = None
|
|
status: str
|
|
job_id: str | None = None
|
|
logs: str | None = None
|
|
uploaded_by: str | None = None
|
|
uploaded_at: datetime
|
|
updated_at: datetime | None = None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
@router.get("/", response_class=HTMLResponse)
|
|
async def index(request: Request):
|
|
"""Landing page with navigation menu"""
|
|
user = request.session.get("user")
|
|
return templates.TemplateResponse(
|
|
"index.html",
|
|
{
|
|
"request": request,
|
|
"root_path": settings.ROOT_PATH,
|
|
"user": user
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/data-management/finance", response_class=HTMLResponse)
|
|
async def finance_page(request: Request):
|
|
"""Finance Excel upload page - requires operation or admin role"""
|
|
user = request.session.get("user")
|
|
return templates.TemplateResponse(
|
|
"data_management_finance.html",
|
|
{
|
|
"request": request,
|
|
"root_path": settings.ROOT_PATH,
|
|
"user": user
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/admin/users", response_class=HTMLResponse)
|
|
async def admin_users_page(
|
|
request: Request,
|
|
current_user: dict = Depends(require_role(Roles.ADMIN))
|
|
):
|
|
"""User management page - Admin only"""
|
|
return templates.TemplateResponse(
|
|
"admin_users.html",
|
|
{
|
|
"request": request,
|
|
"root_path": settings.ROOT_PATH,
|
|
"user": current_user
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/api-management", response_class=HTMLResponse)
|
|
async def api_management_page(
|
|
request: Request,
|
|
current_user: dict = Depends(require_role(Roles.ADMIN))
|
|
):
|
|
"""API Key management page - Admin only"""
|
|
return templates.TemplateResponse(
|
|
"api_management.html",
|
|
{
|
|
"request": request,
|
|
"root_path": settings.ROOT_PATH,
|
|
"user": current_user
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/data-management/finance/upload")
|
|
async def upload_finance_file(
|
|
request: Request,
|
|
file: UploadFile = File(...),
|
|
description: str = Form(None),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Handle finance Excel file upload
|
|
|
|
- Saves file to /data/uploads/
|
|
- Stores upload record in database
|
|
- Triggers Airflow job (to be implemented)
|
|
- Returns upload record
|
|
"""
|
|
# Validate file type
|
|
if not file.filename.endswith(('.xlsx', '.xls')):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid file type. Only .xlsx and .xls files are allowed."
|
|
)
|
|
|
|
# Generate unique filename
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_filename = file.filename.replace(" ", "_")
|
|
unique_filename = f"{timestamp}_{safe_filename}"
|
|
|
|
# Read file content
|
|
try:
|
|
content = await file.read()
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to read file: {str(e)}")
|
|
|
|
# Upload to MinIO finance bucket
|
|
object_key = f"finance/{unique_filename}"
|
|
try:
|
|
minio_client.upload_file(
|
|
bucket=settings.MINIO_BUCKET_FINANCE,
|
|
object_name=object_key,
|
|
data=content,
|
|
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
)
|
|
filepath_stored = object_key # store MinIO key in DB
|
|
except Exception as e:
|
|
logger.error(f"MinIO upload failed: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to upload file to storage: {str(e)}")
|
|
|
|
# Get username from session
|
|
user = request.session.get("user")
|
|
username = user.get("username") if user else "anonymous"
|
|
|
|
# Create upload record in database
|
|
upload_id = f"upload_{timestamp}"
|
|
upload_record = UploadHistory(
|
|
upload_id=upload_id,
|
|
filename=file.filename,
|
|
filepath=filepath_stored, # MinIO object key: finance/<filename>
|
|
description=description,
|
|
status="pending",
|
|
uploaded_by=username
|
|
)
|
|
|
|
db.add(upload_record)
|
|
db.commit()
|
|
db.refresh(upload_record)
|
|
|
|
# Trigger Airflow DAG with retry logic
|
|
airflow_triggered = False
|
|
dag_run_id = None
|
|
error_msg = None
|
|
|
|
max_retries = 3
|
|
retry_delay = 10 # seconds
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
logger.info(f"Triggering Airflow DAG (attempt {attempt + 1}/{max_retries})")
|
|
|
|
result = await airflow_client.trigger_finance_dag(
|
|
upload_id=upload_id,
|
|
filepath=str(filepath_stored),
|
|
filename=file.filename,
|
|
uploaded_by=username,
|
|
description=description
|
|
)
|
|
|
|
dag_run_id = result.get("dag_run_id")
|
|
airflow_triggered = True
|
|
|
|
# Update upload record with Airflow info
|
|
upload_record.airflow_dag_run_id = dag_run_id
|
|
upload_record.airflow_state = result.get("state", "queued")
|
|
upload_record.status = "processing"
|
|
db.commit()
|
|
|
|
logger.info(f"Airflow DAG triggered successfully: {dag_run_id}")
|
|
break
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"Failed to trigger Airflow (attempt {attempt + 1}/{max_retries}): {error_msg}")
|
|
|
|
if attempt < max_retries - 1:
|
|
logger.info(f"Retrying in {retry_delay} seconds...")
|
|
await asyncio.sleep(retry_delay)
|
|
else:
|
|
logger.error(f"All {max_retries} attempts failed to trigger Airflow")
|
|
upload_record.status = "error"
|
|
upload_record.error_message = f"Failed to trigger Airflow after {max_retries} attempts: {error_msg}"
|
|
db.commit()
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"File '{file.filename}' uploaded successfully",
|
|
"upload_id": upload_id,
|
|
"filename": unique_filename,
|
|
"airflow_triggered": airflow_triggered,
|
|
"dag_run_id": dag_run_id,
|
|
"error": error_msg if not airflow_triggered else None
|
|
}
|
|
|
|
|
|
@router.get("/data-management/finance/uploads")
|
|
async def get_uploads(db: Session = Depends(get_db)):
|
|
"""Get list of all uploads with their status"""
|
|
uploads = db.query(UploadHistory).order_by(UploadHistory.uploaded_at.desc()).all()
|
|
|
|
# Convert to dict for JSON response
|
|
return [
|
|
{
|
|
"id": upload.upload_id,
|
|
"filename": upload.filename,
|
|
"filepath": upload.filepath,
|
|
"uploaded_at": upload.uploaded_at.isoformat(),
|
|
"description": upload.description,
|
|
"status": upload.status,
|
|
"job_id": upload.job_id,
|
|
"logs": upload.logs,
|
|
"uploaded_by": upload.uploaded_by,
|
|
"airflow_dag_run_id": upload.airflow_dag_run_id,
|
|
"airflow_state": upload.airflow_state,
|
|
"processing_started_at": upload.processing_started_at.isoformat() if upload.processing_started_at else None,
|
|
"processing_completed_at": upload.processing_completed_at.isoformat() if upload.processing_completed_at else None,
|
|
"error_message": upload.error_message
|
|
}
|
|
for upload in uploads
|
|
]
|
|
|
|
|
|
@router.get("/data-management/finance/uploads/{upload_id}")
|
|
async def get_upload_status(upload_id: str, db: Session = Depends(get_db)):
|
|
"""Get status of a specific upload"""
|
|
upload = db.query(UploadHistory).filter(UploadHistory.upload_id == upload_id).first()
|
|
if not upload:
|
|
raise HTTPException(status_code=404, detail="Upload not found")
|
|
|
|
return {
|
|
"id": upload.upload_id,
|
|
"filename": upload.filename,
|
|
"filepath": upload.filepath,
|
|
"uploaded_at": upload.uploaded_at.isoformat(),
|
|
"description": upload.description,
|
|
"status": upload.status,
|
|
"job_id": upload.job_id,
|
|
"logs": upload.logs,
|
|
"uploaded_by": upload.uploaded_by,
|
|
"airflow_dag_run_id": upload.airflow_dag_run_id,
|
|
"airflow_state": upload.airflow_state,
|
|
"processing_started_at": upload.processing_started_at.isoformat() if upload.processing_started_at else None,
|
|
"processing_completed_at": upload.processing_completed_at.isoformat() if upload.processing_completed_at else None,
|
|
"error_message": upload.error_message
|
|
}
|
|
|
|
|
|
# Placeholder for Airflow integration
|
|
async def trigger_airflow_job(filepath: str, upload_id: str) -> str:
|
|
"""
|
|
Trigger Airflow DAG to process the uploaded file
|
|
|
|
Args:
|
|
filepath: Path to the uploaded file
|
|
upload_id: Unique upload identifier
|
|
|
|
Returns:
|
|
job_id: Airflow job/run ID
|
|
|
|
This function will be implemented when:
|
|
- Airflow DAG ID is provided
|
|
- Airflow API endpoint is configured
|
|
"""
|
|
# TODO: Implement Airflow API call
|
|
# Example implementation:
|
|
# import httpx
|
|
#
|
|
# airflow_url = "http://airflow-webserver:8080/api/v1/dags/{dag_id}/dagRuns"
|
|
# headers = {"Content-Type": "application/json"}
|
|
# auth = ("airflow", "airflow") # Use proper credentials
|
|
#
|
|
# payload = {
|
|
# "conf": {
|
|
# "filepath": filepath,
|
|
# "upload_id": upload_id
|
|
# }
|
|
# }
|
|
#
|
|
# async with httpx.AsyncClient() as client:
|
|
# response = await client.post(
|
|
# airflow_url,
|
|
# json=payload,
|
|
# headers=headers,
|
|
# auth=auth
|
|
# )
|
|
# response.raise_for_status()
|
|
# result = response.json()
|
|
# return result["dag_run_id"]
|
|
|
|
raise NotImplementedError("Airflow integration pending DAG ID and endpoint")
|