from __future__ import annotations from typing import Annotated from zoneinfo import ZoneInfo from fastapi import APIRouter, Depends from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session from app.api.v1.schemas import FeedCheckpointIn from app.core.config import settings from app.db.models import RawOpdCheckpoint from app.security.dependencies import get_db, require_permission router = APIRouter(prefix="/api/v1") PERM_FEED_CHECKPOINT_WRITE = "feed.checkpoint:write" def _to_tz(dt): if dt is None: return None if dt.tzinfo is None: return dt.replace(tzinfo=ZoneInfo(settings.TIMEZONE)) return dt.astimezone(ZoneInfo(settings.TIMEZONE)) @router.post("/feed/checkpoint") def upsert_feed_checkpoint( payload: list[FeedCheckpointIn], _: Annotated[object, Depends(require_permission(PERM_FEED_CHECKPOINT_WRITE))], db: Annotated[Session, Depends(get_db)], ): rows = [] for item in payload: rows.append( { "id": item.id, "hn": item.hn, "vn": item.vn, "location": item.location, "type": item.type, "timestamp_in": _to_tz(item.timestamp_in), "timestamp_out": _to_tz(item.timestamp_out), "waiting_time": item.waiting_time, "bu": item.bu, } ) stmt = insert(RawOpdCheckpoint).values(rows) update_cols = { "hn": stmt.excluded.hn, "vn": stmt.excluded.vn, "location": stmt.excluded.location, "type": stmt.excluded.type, "timestamp_in": stmt.excluded.timestamp_in, "timestamp_out": stmt.excluded.timestamp_out, "waiting_time": stmt.excluded.waiting_time, "bu": stmt.excluded.bu, } stmt = stmt.on_conflict_do_update(index_elements=[RawOpdCheckpoint.id], set_=update_cols) result = db.execute(stmt) db.commit() return {"upserted": len(rows), "rowcount": result.rowcount}