add previous fix bug forgotting commit-push
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Annotated
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
@@ -8,15 +9,19 @@ from fastapi import APIRouter, Depends
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.api.v1.schemas import FeedWaitingTimeIn
|
||||
from app.api.v1.schemas import FeedCheckpointIn, FeedWaitingTimeIn
|
||||
from app.core.config import settings
|
||||
from app.db.models import RawWaitingTime
|
||||
from app.db.models import RawOpdCheckpoint, RawWaitingTime
|
||||
from app.security.dependencies import get_db, require_permission
|
||||
from app.utils.supabase_client import SupabaseAPIError, upsert_to_supabase_sync
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/v1")
|
||||
|
||||
PERM_FEED_WAITING_TIME_WRITE = "feed.waiting-time:write"
|
||||
PERM_FEED_OPD_CHECKPOINT_WRITE = "/api/v1/feed/opd-checkpoint:write"
|
||||
PERM_FEED_OPD_CHECKPOINT_WRITE_LEGACY = "feed.opd-checkpoint:write"
|
||||
|
||||
|
||||
def _to_tz(dt):
|
||||
@@ -27,6 +32,13 @@ def _to_tz(dt):
|
||||
return dt.astimezone(ZoneInfo(settings.TIMEZONE))
|
||||
|
||||
|
||||
def _to_iso(dt):
|
||||
"""Convert datetime to ISO 8601 string for Supabase API."""
|
||||
if dt is None:
|
||||
return None
|
||||
return dt.isoformat()
|
||||
|
||||
|
||||
@router.post("/feed/checkpoint")
|
||||
def upsert_feed_checkpoint(
|
||||
payload: list[FeedWaitingTimeIn],
|
||||
@@ -72,3 +84,92 @@ def upsert_feed_checkpoint(
|
||||
db.commit()
|
||||
|
||||
return {"upserted": len(rows), "rowcount": result.rowcount}
|
||||
|
||||
|
||||
@router.post("/feed/old-checkpoint")
|
||||
def upsert_opd_checkpoint(
|
||||
payload: list[FeedCheckpointIn],
|
||||
db: Annotated[Session, Depends(get_db)],
|
||||
):
|
||||
rows = []
|
||||
supabase_rows = []
|
||||
|
||||
for item in payload:
|
||||
# Prepare data for local database
|
||||
row = {
|
||||
"id": item.id,
|
||||
"hn": item.hn,
|
||||
"vn": item.vn,
|
||||
"location": item.location,
|
||||
"type": item.type,
|
||||
"timestamp_in": _to_tz(item.timestamp_in),
|
||||
"timestamp_out": _to_tz(item.timestamp_out),
|
||||
"waiting_time": item.waiting_time,
|
||||
"bu": item.bu,
|
||||
}
|
||||
if item.id is None:
|
||||
del row["id"]
|
||||
rows.append(row)
|
||||
|
||||
# Prepare data for Supabase API (convert datetime to ISO string)
|
||||
supabase_row = {
|
||||
"id": item.id,
|
||||
"hn": item.hn,
|
||||
"vn": item.vn,
|
||||
"location": item.location,
|
||||
"type": item.type,
|
||||
"timestamp_in": _to_iso(_to_tz(item.timestamp_in)),
|
||||
"timestamp_out": _to_iso(_to_tz(item.timestamp_out)),
|
||||
"waiting_time": item.waiting_time,
|
||||
"bu": item.bu,
|
||||
}
|
||||
if item.id is None:
|
||||
del supabase_row["id"]
|
||||
supabase_rows.append(supabase_row)
|
||||
|
||||
# Insert/update to local database
|
||||
stmt = insert(RawOpdCheckpoint).values(rows)
|
||||
update_cols = {
|
||||
"id": stmt.excluded.id,
|
||||
"type": stmt.excluded.type,
|
||||
"timestamp_in": stmt.excluded.timestamp_in,
|
||||
"timestamp_out": stmt.excluded.timestamp_out,
|
||||
"waiting_time": stmt.excluded.waiting_time,
|
||||
"bu": stmt.excluded.bu,
|
||||
}
|
||||
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
index_elements=[RawOpdCheckpoint.hn, RawOpdCheckpoint.vn, RawOpdCheckpoint.location],
|
||||
set_=update_cols,
|
||||
)
|
||||
result = db.execute(stmt)
|
||||
db.commit()
|
||||
|
||||
# Send data to Supabase via API call
|
||||
supabase_result = None
|
||||
supabase_error = None
|
||||
|
||||
try:
|
||||
logger.info(f"Sending {len(supabase_rows)} records to Supabase API")
|
||||
supabase_result = upsert_to_supabase_sync(
|
||||
table="raw_opd_checkpoint",
|
||||
data=supabase_rows,
|
||||
on_conflict="hn,vn,location",
|
||||
)
|
||||
logger.info(f"Successfully sent data to Supabase: {supabase_result.get('status_code')}")
|
||||
except SupabaseAPIError as e:
|
||||
logger.error(f"Failed to send data to Supabase: {str(e)}")
|
||||
supabase_error = str(e)
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error sending data to Supabase: {str(e)}")
|
||||
supabase_error = f"Unexpected error: {str(e)}"
|
||||
|
||||
return {
|
||||
"upserted": len(rows),
|
||||
"rowcount": result.rowcount,
|
||||
"supabase": {
|
||||
"success": supabase_result is not None,
|
||||
"result": supabase_result,
|
||||
"error": supabase_error,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -15,3 +15,15 @@ class FeedWaitingTimeIn(BaseModel):
|
||||
location_name: str | None = None
|
||||
step_name: str | None = None
|
||||
time: datetime
|
||||
|
||||
|
||||
class FeedCheckpointIn(BaseModel):
|
||||
id: int | None = None
|
||||
hn: int
|
||||
vn: int
|
||||
location: str
|
||||
type: str
|
||||
timestamp_in: datetime
|
||||
timestamp_out: datetime | None = None
|
||||
waiting_time: int | None = None
|
||||
bu: str | None = None
|
||||
|
||||
@@ -13,6 +13,16 @@ class Settings(BaseSettings):
|
||||
DB_NAME: str
|
||||
DB_SSLMODE: str = "prefer"
|
||||
|
||||
SUPABASE_DB_HOST: str
|
||||
SUPABASE_DB_PORT: int = 5432
|
||||
SUPABASE_DB_USER: str
|
||||
SUPABASE_DB_PASSWORD: str
|
||||
SUPABASE_DB_NAME: str
|
||||
SUPABASE_DB_SSLMODE: str = "disable"
|
||||
|
||||
SUPABASE_API_URL: str
|
||||
SUPABASE_API_KEY: str
|
||||
|
||||
ROOT_PATH: str = ""
|
||||
|
||||
TIMEZONE: str = "Asia/Bangkok"
|
||||
|
||||
@@ -18,4 +18,18 @@ def build_db_url() -> str:
|
||||
)
|
||||
|
||||
|
||||
def build_supabase_db_url() -> str:
|
||||
user = quote_plus(settings.SUPABASE_DB_USER)
|
||||
password = quote_plus(settings.SUPABASE_DB_PASSWORD)
|
||||
host = settings.SUPABASE_DB_HOST
|
||||
port = settings.SUPABASE_DB_PORT
|
||||
db = quote_plus(settings.SUPABASE_DB_NAME)
|
||||
|
||||
return (
|
||||
f"postgresql+psycopg://{user}:{password}@{host}:{port}/{db}"
|
||||
f"?sslmode={quote_plus(settings.SUPABASE_DB_SSLMODE)}"
|
||||
)
|
||||
|
||||
|
||||
engine = create_engine(build_db_url(), pool_pre_ping=True)
|
||||
supabase_engine = create_engine(build_supabase_db_url(), pool_pre_ping=True)
|
||||
|
||||
@@ -2,13 +2,31 @@ from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text, func
|
||||
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint, func
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.db.base import Base
|
||||
|
||||
|
||||
class RawOpdCheckpoint(Base):
|
||||
__tablename__ = "raw_opd_checkpoint"
|
||||
__table_args__ = (
|
||||
UniqueConstraint("hn", "vn", "location", name="uq_raw_opd_checkpoint_hn_vn_location"),
|
||||
{"schema": "rawdata"},
|
||||
)
|
||||
|
||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||
hn: Mapped[int] = mapped_column(BigInteger, nullable=False)
|
||||
vn: Mapped[int] = mapped_column(BigInteger, nullable=False)
|
||||
location: Mapped[str] = mapped_column(Text, nullable=False)
|
||||
type: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
timestamp_in: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
||||
timestamp_out: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||
waiting_time: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
||||
bu: Mapped[str | None] = mapped_column(String(128), nullable=True)
|
||||
|
||||
|
||||
class RawWaitingTime(Base):
|
||||
__tablename__ = "raw_waiting_time"
|
||||
__table_args__ = {"schema": "rawdata"}
|
||||
@@ -42,6 +60,15 @@ class ApiClient(Base):
|
||||
passive_deletes=True,
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
client_id = getattr(self, "id", None)
|
||||
if client_id is None:
|
||||
return self.name
|
||||
return f"{self.name} ({client_id})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return str(self)
|
||||
|
||||
|
||||
class ApiKey(Base):
|
||||
__tablename__ = "api_key"
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
from contextlib import asynccontextmanager
|
||||
import os
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from starlette.datastructures import Headers
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.middleware.sessions import SessionMiddleware
|
||||
import sqladmin
|
||||
|
||||
from app.admin import mount_admin
|
||||
from app.api.v1.routes import router as v1_router
|
||||
@@ -9,13 +15,59 @@ from app.core.config import settings
|
||||
from app.db.init_db import init_db
|
||||
|
||||
|
||||
class ForceHTTPSMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request, call_next):
|
||||
request.scope["scheme"] = "https"
|
||||
response = await call_next(request)
|
||||
return response
|
||||
|
||||
|
||||
class ForwardedProtoMiddleware:
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
async def __call__(self, scope, receive, send):
|
||||
if scope["type"] in {"http", "websocket"}:
|
||||
headers = Headers(scope=scope)
|
||||
forwarded_proto = headers.get("x-forwarded-proto")
|
||||
if forwarded_proto:
|
||||
proto = forwarded_proto.split(",", 1)[0].strip()
|
||||
if proto:
|
||||
new_scope = dict(scope)
|
||||
new_scope["scheme"] = proto
|
||||
return await self.app(new_scope, receive, send)
|
||||
|
||||
return await self.app(scope, receive, send)
|
||||
|
||||
|
||||
origins = [
|
||||
"http://localhost:8040",
|
||||
"https://ai.sriphat.com",
|
||||
"http://ai.sriphat.com",
|
||||
]
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_: FastAPI):
|
||||
init_db()
|
||||
yield
|
||||
|
||||
|
||||
sqladmin_dir = os.path.dirname(sqladmin.__file__)
|
||||
statics_path = os.path.join(sqladmin_dir, "statics")
|
||||
|
||||
app = FastAPI(title=settings.APP_NAME, root_path=settings.ROOT_PATH, lifespan=lifespan)
|
||||
app.add_middleware(ForceHTTPSMiddleware)
|
||||
app.add_middleware(SessionMiddleware, secret_key=settings.ADMIN_SECRET_KEY)
|
||||
app.add_middleware(ForwardedProtoMiddleware)
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
app.include_router(v1_router)
|
||||
app.mount("/admin/statics", StaticFiles(directory=statics_path), name="admin_statics")
|
||||
app.mount("/apiservice/admin/statics", StaticFiles(directory=statics_path), name="proxy_admin_statics")
|
||||
mount_admin(app)
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
from typing import Annotated
|
||||
from collections.abc import Sequence
|
||||
|
||||
from fastapi import Depends, HTTPException, Request, status
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from app.db.engine import engine
|
||||
from app.db.engine import engine, supabase_engine
|
||||
from app.db.models import ApiKey
|
||||
from app.security.api_key import get_prefix, verify_api_key
|
||||
|
||||
|
||||
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
||||
SupabaseSessionLocal = sessionmaker(bind=supabase_engine, autoflush=False, autocommit=False)
|
||||
|
||||
|
||||
def get_db():
|
||||
@@ -20,6 +22,14 @@ def get_db():
|
||||
db.close()
|
||||
|
||||
|
||||
def get_supabase_db():
|
||||
db = SupabaseSessionLocal()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def get_bearer_token(request: Request) -> str:
|
||||
auth = request.headers.get("authorization")
|
||||
if not auth:
|
||||
@@ -32,7 +42,7 @@ def get_bearer_token(request: Request) -> str:
|
||||
return parts[1].strip()
|
||||
|
||||
|
||||
def require_permission(permission: str):
|
||||
def require_permission(permission: str | Sequence[str]):
|
||||
def _dep(
|
||||
token: Annotated[str, Depends(get_bearer_token)],
|
||||
db: Annotated[Session, Depends(get_db)],
|
||||
@@ -46,7 +56,9 @@ def require_permission(permission: str):
|
||||
if not verify_api_key(token, api_key.key_hash):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")
|
||||
|
||||
if permission not in (api_key.permissions or []):
|
||||
allowed = set(api_key.permissions or [])
|
||||
required = [permission] if isinstance(permission, str) else list(permission)
|
||||
if not any(p in allowed for p in required):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied")
|
||||
|
||||
return api_key
|
||||
|
||||
1
03-apiservice/app/utils/__init__.py
Normal file
1
03-apiservice/app/utils/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Utility functions and helpers."""
|
||||
122
03-apiservice/app/utils/supabase_client.py
Normal file
122
03-apiservice/app/utils/supabase_client.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Supabase API client utility for making REST API calls."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SupabaseAPIError(Exception):
|
||||
"""Exception raised when Supabase API call fails."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
async def upsert_to_supabase(
|
||||
table: str,
|
||||
data: list[dict[str, Any]],
|
||||
on_conflict: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Upsert data to Supabase table via REST API.
|
||||
|
||||
Args:
|
||||
table: Table name (e.g., "raw_opd_checkpoint")
|
||||
data: List of records to upsert
|
||||
on_conflict: Columns to use for conflict resolution (e.g., "hn,vn,location")
|
||||
|
||||
Returns:
|
||||
Response from Supabase API
|
||||
|
||||
Raises:
|
||||
SupabaseAPIError: If the API call fails
|
||||
"""
|
||||
url = f"{settings.SUPABASE_API_URL}/rest/v1/{table}"
|
||||
headers = {
|
||||
"apikey": settings.SUPABASE_API_KEY,
|
||||
"Authorization": f"Bearer {settings.SUPABASE_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
"Prefer": "resolution=merge-duplicates",
|
||||
}
|
||||
|
||||
if on_conflict:
|
||||
headers["Prefer"] += f",on_conflict={on_conflict}"
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
response = await client.post(url, json=data, headers=headers)
|
||||
response.raise_for_status()
|
||||
return {
|
||||
"status": "success",
|
||||
"status_code": response.status_code,
|
||||
"data": response.json() if response.text else None,
|
||||
}
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_msg = f"Supabase API error: {e.response.status_code} - {e.response.text}"
|
||||
logger.error(error_msg)
|
||||
raise SupabaseAPIError(error_msg) from e
|
||||
except httpx.RequestError as e:
|
||||
error_msg = f"Supabase API request failed: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise SupabaseAPIError(error_msg) from e
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error calling Supabase API: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise SupabaseAPIError(error_msg) from e
|
||||
|
||||
|
||||
def upsert_to_supabase_sync(
|
||||
table: str,
|
||||
data: list[dict[str, Any]],
|
||||
on_conflict: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Synchronous version of upsert_to_supabase.
|
||||
|
||||
Args:
|
||||
table: Table name (e.g., "raw_opd_checkpoint")
|
||||
data: List of records to upsert
|
||||
on_conflict: Columns to use for conflict resolution (e.g., "hn,vn,location")
|
||||
|
||||
Returns:
|
||||
Response from Supabase API
|
||||
|
||||
Raises:
|
||||
SupabaseAPIError: If the API call fails
|
||||
"""
|
||||
url = f"{settings.SUPABASE_API_URL}/rest/v1/{table}"
|
||||
headers = {
|
||||
"apikey": settings.SUPABASE_API_KEY,
|
||||
"Authorization": f"Bearer {settings.SUPABASE_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
"Prefer": "resolution=merge-duplicates",
|
||||
}
|
||||
|
||||
if on_conflict:
|
||||
headers["Prefer"] += f",on_conflict={on_conflict}"
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=30.0) as client:
|
||||
response = client.post(url, json=data, headers=headers)
|
||||
response.raise_for_status()
|
||||
return {
|
||||
"status": "success",
|
||||
"status_code": response.status_code,
|
||||
"data": response.json() if response.text else None,
|
||||
}
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_msg = f"Supabase API error: {e.response.status_code} - {e.response.text}"
|
||||
logger.error(error_msg)
|
||||
raise SupabaseAPIError(error_msg) from e
|
||||
except httpx.RequestError as e:
|
||||
error_msg = f"Supabase API request failed: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise SupabaseAPIError(error_msg) from e
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error calling Supabase API: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
raise SupabaseAPIError(error_msg) from e
|
||||
Reference in New Issue
Block a user