Files
sriphat-dataplatform/03-apiservice/app/api/v1/routes.py

68 lines
2.0 KiB
Python

from __future__ import annotations
from typing import Annotated
from zoneinfo import ZoneInfo
from fastapi import APIRouter, Depends
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from app.api.v1.schemas import FeedCheckpointIn
from app.core.config import settings
from app.db.models import RawOpdCheckpoint
from app.security.dependencies import get_db, require_permission
router = APIRouter(prefix="/api/v1")
PERM_FEED_CHECKPOINT_WRITE = "feed.checkpoint:write"
def _to_tz(dt):
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=ZoneInfo(settings.TIMEZONE))
return dt.astimezone(ZoneInfo(settings.TIMEZONE))
@router.post("/feed/checkpoint")
def upsert_feed_checkpoint(
payload: list[FeedCheckpointIn],
_: Annotated[object, Depends(require_permission(PERM_FEED_CHECKPOINT_WRITE))],
db: Annotated[Session, Depends(get_db)],
):
rows = []
for item in payload:
rows.append(
{
"id": item.id,
"hn": item.hn,
"vn": item.vn,
"location": item.location,
"type": item.type,
"timestamp_in": _to_tz(item.timestamp_in),
"timestamp_out": _to_tz(item.timestamp_out),
"waiting_time": item.waiting_time,
"bu": item.bu,
}
)
stmt = insert(RawOpdCheckpoint).values(rows)
update_cols = {
"hn": stmt.excluded.hn,
"vn": stmt.excluded.vn,
"location": stmt.excluded.location,
"type": stmt.excluded.type,
"timestamp_in": stmt.excluded.timestamp_in,
"timestamp_out": stmt.excluded.timestamp_out,
"waiting_time": stmt.excluded.waiting_time,
"bu": stmt.excluded.bu,
}
stmt = stmt.on_conflict_do_update(index_elements=[RawOpdCheckpoint.id], set_=update_cols)
result = db.execute(stmt)
db.commit()
return {"upserted": len(rows), "rowcount": result.rowcount}