Files
sriphat-dataplatform/03-apiservice-v0.1/app/api/v1/routes.py
2026-02-24 22:33:37 +07:00

128 lines
4.0 KiB
Python

from __future__ import annotations
import logging
from typing import Annotated
from zoneinfo import ZoneInfo
from fastapi import APIRouter, Depends
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from app.api.v1.schemas import FeedCheckpointIn
from app.core.config import settings
from app.db.models import RawOpdCheckpoint
from app.security.dependencies import get_db, get_supabase_db, require_permission
from app.utils.supabase_client import SupabaseAPIError, upsert_to_supabase_sync
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1")
PERM_FEED_CHECKPOINT_WRITE = "/api/v1/feed/checkpoint:write"
PERM_FEED_CHECKPOINT_WRITE_LEGACY = "feed.checkpoint:write"
def _to_tz(dt):
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=ZoneInfo(settings.TIMEZONE))
return dt.astimezone(ZoneInfo(settings.TIMEZONE))
def _to_iso(dt):
"""Convert datetime to ISO 8601 string for Supabase API."""
if dt is None:
return None
return dt.isoformat()
@router.post("/feed/checkpoint")
def upsert_feed_checkpoint(
payload: list[FeedCheckpointIn],
db: Annotated[Session, Depends(get_db)],
):
rows = []
supabase_rows = []
#clean_data = payload.model_dump(exclude_none=True)
for item in payload:
# Prepare data for local database 'default' if item.id is None else
row = {
"id": item.id,
"hn": item.hn,
"vn": item.vn,
"location": item.location,
"type": item.type,
"timestamp_in": _to_tz(item.timestamp_in),
"timestamp_out": _to_tz(item.timestamp_out),
"waiting_time": item.waiting_time,
"bu": item.bu,
}
if item.id is None:
del(row["id"])
rows.append(row)
# Prepare data for Supabase API (convert datetime to ISO string) 'default' if item.id is None else
supabase_row = {
"id": item.id,
"hn": item.hn,
"vn": item.vn,
"location": item.location,
"type": item.type,
"timestamp_in": _to_iso(_to_tz(item.timestamp_in)),
"timestamp_out": _to_iso(_to_tz(item.timestamp_out)),
"waiting_time": item.waiting_time,
"bu": item.bu,
}
if item.id is None:
del(supabase_row["id"])
supabase_rows.append(supabase_row)
# Insert/update to local database
stmt = insert(RawOpdCheckpoint).values(rows)
update_cols = {
"id": stmt.excluded.id,
"type": stmt.excluded.type,
"timestamp_in": stmt.excluded.timestamp_in,
"timestamp_out": stmt.excluded.timestamp_out,
"waiting_time": stmt.excluded.waiting_time,
"bu": stmt.excluded.bu,
}
stmt = stmt.on_conflict_do_update(
index_elements=[RawOpdCheckpoint.hn, RawOpdCheckpoint.vn, RawOpdCheckpoint.location, RawOpdCheckpoint.timestamp_in],
set_=update_cols,
)
result = db.execute(stmt)
db.commit()
# Send data to Supabase via API call
supabase_result = None
supabase_error = None
try:
logger.info(f"Sending {len(supabase_rows)} records to Supabase API")
supabase_result = upsert_to_supabase_sync(
table="raw_opd_checkpoint",
data=supabase_rows,
on_conflict="hn,vn,location,timestamp_in",
)
logger.info(f"Successfully sent data to Supabase: {supabase_result.get('status_code')}")
except SupabaseAPIError as e:
logger.error(f"Failed to send data to Supabase: {str(e)}")
supabase_error = str(e)
except Exception as e:
logger.error(f"Unexpected error sending data to Supabase: {str(e)}")
supabase_error = f"Unexpected error: {str(e)}"
return {
"upserted": len(rows),
"rowcount": result.rowcount,
"supabase": {
"success": supabase_result is not None,
"result": supabase_result,
"error": supabase_error,
},
}