Files
sriphat-dataplatform/03-apiservice/app/routes/pages.py
jigoong ee473aca8f fix: finance upload filepath bug and add extra_hosts for keycloak auth flow
- fix NameError: filepath undefined in trigger_airflow call (use filepath_stored)
- add extra_hosts ai.sriphat.com:192.168.100.8 for container DNS resolution
  (required for KEYCLOAK_SERVER_URL=http://ai.sriphat.com/keycloak/ to work
   inside Docker — host nginx on .8:80 routes /keycloak/ to Keycloak container)
2026-05-27 01:28:57 +07:00

315 lines
10 KiB
Python

"""
Web page routes for the application
"""
import os
from datetime import datetime
from pathlib import Path
from typing import List
from fastapi import APIRouter, Request, UploadFile, File, Form, HTTPException, Depends
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from sqlalchemy.orm import Session
import asyncio
import logging
from app.core.config import settings
from app.security.permissions import require_role, Roles
from app.db.session import get_db
from app.models.upload import UploadHistory
from app.services.airflow_client import airflow_client
from app.services import minio_client
logger = logging.getLogger(__name__)
router = APIRouter()
# Setup templates
templates_dir = Path(__file__).parent.parent / "templates"
templates = Jinja2Templates(directory=str(templates_dir))
# Local fallback directory (used only if MinIO is not configured)
UPLOAD_DIR = Path("/data/uploads")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
class UploadRecordSchema(BaseModel):
id: int
upload_id: str
filename: str
filepath: str
description: str | None = None
status: str
job_id: str | None = None
logs: str | None = None
uploaded_by: str | None = None
uploaded_at: datetime
updated_at: datetime | None = None
class Config:
from_attributes = True
@router.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""Landing page with navigation menu"""
user = request.session.get("user")
return templates.TemplateResponse(
"index.html",
{
"request": request,
"root_path": settings.ROOT_PATH,
"user": user
}
)
@router.get("/data-management/finance", response_class=HTMLResponse)
async def finance_page(request: Request):
"""Finance Excel upload page - requires operation or admin role"""
user = request.session.get("user")
return templates.TemplateResponse(
"data_management_finance.html",
{
"request": request,
"root_path": settings.ROOT_PATH,
"user": user
}
)
@router.get("/admin/users", response_class=HTMLResponse)
async def admin_users_page(
request: Request,
current_user: dict = Depends(require_role(Roles.ADMIN))
):
"""User management page - Admin only"""
return templates.TemplateResponse(
"admin_users.html",
{
"request": request,
"root_path": settings.ROOT_PATH,
"user": current_user
}
)
@router.post("/data-management/finance/upload")
async def upload_finance_file(
request: Request,
file: UploadFile = File(...),
description: str = Form(None),
db: Session = Depends(get_db)
):
"""
Handle finance Excel file upload
- Saves file to /data/uploads/
- Stores upload record in database
- Triggers Airflow job (to be implemented)
- Returns upload record
"""
# Validate file type
if not file.filename.endswith(('.xlsx', '.xls')):
raise HTTPException(
status_code=400,
detail="Invalid file type. Only .xlsx and .xls files are allowed."
)
# Generate unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = file.filename.replace(" ", "_")
unique_filename = f"{timestamp}_{safe_filename}"
# Read file content
try:
content = await file.read()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to read file: {str(e)}")
# Upload to MinIO finance bucket
object_key = f"finance/{unique_filename}"
try:
minio_client.upload_file(
bucket=settings.MINIO_BUCKET_FINANCE,
object_name=object_key,
data=content,
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
filepath_stored = object_key # store MinIO key in DB
except Exception as e:
logger.error(f"MinIO upload failed: {e}")
raise HTTPException(status_code=500, detail=f"Failed to upload file to storage: {str(e)}")
# Get username from session
user = request.session.get("user")
username = user.get("username") if user else "anonymous"
# Create upload record in database
upload_id = f"upload_{timestamp}"
upload_record = UploadHistory(
upload_id=upload_id,
filename=file.filename,
filepath=filepath_stored, # MinIO object key: finance/<filename>
description=description,
status="pending",
uploaded_by=username
)
db.add(upload_record)
db.commit()
db.refresh(upload_record)
# Trigger Airflow DAG with retry logic
airflow_triggered = False
dag_run_id = None
error_msg = None
max_retries = 3
retry_delay = 10 # seconds
for attempt in range(max_retries):
try:
logger.info(f"Triggering Airflow DAG (attempt {attempt + 1}/{max_retries})")
result = await airflow_client.trigger_finance_dag(
upload_id=upload_id,
filepath=str(filepath_stored),
filename=file.filename,
uploaded_by=username,
description=description
)
dag_run_id = result.get("dag_run_id")
airflow_triggered = True
# Update upload record with Airflow info
upload_record.airflow_dag_run_id = dag_run_id
upload_record.airflow_state = result.get("state", "queued")
upload_record.status = "processing"
db.commit()
logger.info(f"Airflow DAG triggered successfully: {dag_run_id}")
break
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to trigger Airflow (attempt {attempt + 1}/{max_retries}): {error_msg}")
if attempt < max_retries - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
else:
logger.error(f"All {max_retries} attempts failed to trigger Airflow")
upload_record.status = "error"
upload_record.error_message = f"Failed to trigger Airflow after {max_retries} attempts: {error_msg}"
db.commit()
return {
"success": True,
"message": f"File '{file.filename}' uploaded successfully",
"upload_id": upload_id,
"filename": unique_filename,
"airflow_triggered": airflow_triggered,
"dag_run_id": dag_run_id,
"error": error_msg if not airflow_triggered else None
}
@router.get("/data-management/finance/uploads")
async def get_uploads(db: Session = Depends(get_db)):
"""Get list of all uploads with their status"""
uploads = db.query(UploadHistory).order_by(UploadHistory.uploaded_at.desc()).all()
# Convert to dict for JSON response
return [
{
"id": upload.upload_id,
"filename": upload.filename,
"filepath": upload.filepath,
"uploaded_at": upload.uploaded_at.isoformat(),
"description": upload.description,
"status": upload.status,
"job_id": upload.job_id,
"logs": upload.logs,
"uploaded_by": upload.uploaded_by,
"airflow_dag_run_id": upload.airflow_dag_run_id,
"airflow_state": upload.airflow_state,
"processing_started_at": upload.processing_started_at.isoformat() if upload.processing_started_at else None,
"processing_completed_at": upload.processing_completed_at.isoformat() if upload.processing_completed_at else None,
"error_message": upload.error_message
}
for upload in uploads
]
@router.get("/data-management/finance/uploads/{upload_id}")
async def get_upload_status(upload_id: str, db: Session = Depends(get_db)):
"""Get status of a specific upload"""
upload = db.query(UploadHistory).filter(UploadHistory.upload_id == upload_id).first()
if not upload:
raise HTTPException(status_code=404, detail="Upload not found")
return {
"id": upload.upload_id,
"filename": upload.filename,
"filepath": upload.filepath,
"uploaded_at": upload.uploaded_at.isoformat(),
"description": upload.description,
"status": upload.status,
"job_id": upload.job_id,
"logs": upload.logs,
"uploaded_by": upload.uploaded_by,
"airflow_dag_run_id": upload.airflow_dag_run_id,
"airflow_state": upload.airflow_state,
"processing_started_at": upload.processing_started_at.isoformat() if upload.processing_started_at else None,
"processing_completed_at": upload.processing_completed_at.isoformat() if upload.processing_completed_at else None,
"error_message": upload.error_message
}
# Placeholder for Airflow integration
async def trigger_airflow_job(filepath: str, upload_id: str) -> str:
"""
Trigger Airflow DAG to process the uploaded file
Args:
filepath: Path to the uploaded file
upload_id: Unique upload identifier
Returns:
job_id: Airflow job/run ID
This function will be implemented when:
- Airflow DAG ID is provided
- Airflow API endpoint is configured
"""
# TODO: Implement Airflow API call
# Example implementation:
# import httpx
#
# airflow_url = "http://airflow-webserver:8080/api/v1/dags/{dag_id}/dagRuns"
# headers = {"Content-Type": "application/json"}
# auth = ("airflow", "airflow") # Use proper credentials
#
# payload = {
# "conf": {
# "filepath": filepath,
# "upload_id": upload_id
# }
# }
#
# async with httpx.AsyncClient() as client:
# response = await client.post(
# airflow_url,
# json=payload,
# headers=headers,
# auth=auth
# )
# response.raise_for_status()
# result = response.json()
# return result["dag_run_id"]
raise NotImplementedError("Airflow integration pending DAG ID and endpoint")