Files
sriphat-dataplatform/03-apiservice/app/utils/supabase_client.py
2026-02-27 17:24:20 +07:00

125 lines
3.9 KiB
Python

"""Supabase API client utility for making REST API calls."""
import logging
from typing import Any
import httpx
from app.core.config import settings
logger = logging.getLogger(__name__)
class SupabaseAPIError(Exception):
"""Exception raised when Supabase API call fails."""
pass
async def upsert_to_supabase(
table: str,
data: list[dict[str, Any]],
on_conflict: str | None = None,
) -> dict[str, Any]:
"""
Upsert data to Supabase table via REST API.
Args:
table: Table name (e.g., "raw_opd_checkpoint")
data: List of records to upsert
on_conflict: Columns to use for conflict resolution (e.g., "hn,vn,location")
Returns:
Response from Supabase API
Raises:
SupabaseAPIError: If the API call fails
"""
url = f"{settings.SUPABASE_API_URL}/rest/v1/{table}"
headers = {
"apikey": settings.SUPABASE_API_KEY,
"Authorization": f"Bearer {settings.SUPABASE_API_KEY}",
"Content-Type": "application/json",
"Content-Profile": "rawdata",
"Prefer": "resolution=merge-duplicates",
}
if on_conflict:
headers["Prefer"] += f",on_conflict={on_conflict}"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, json=data, headers=headers)
response.raise_for_status()
return {
"status": "success",
"status_code": response.status_code,
"data": response.json() if response.text else None,
}
except httpx.HTTPStatusError as e:
error_msg = f"Supabase API error: {e.response.status_code} - {e.response.text}"
logger.error(error_msg)
raise SupabaseAPIError(error_msg) from e
except httpx.RequestError as e:
error_msg = f"Supabase API request failed: {str(e)}"
logger.error(error_msg)
raise SupabaseAPIError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error calling Supabase API: {str(e)}"
logger.error(error_msg)
raise SupabaseAPIError(error_msg) from e
def upsert_to_supabase_sync(
table: str,
data: list[dict[str, Any]],
on_conflict: str | None = None,
) -> dict[str, Any]:
"""
Synchronous version of upsert_to_supabase.
Args:
table: Table name (e.g., "raw_opd_checkpoint")
data: List of records to upsert
on_conflict: Columns to use for conflict resolution (e.g., "hn,vn,location")
Returns:
Response from Supabase API
Raises:
SupabaseAPIError: If the API call fails
"""
url = f"{settings.SUPABASE_API_URL}/rest/v1/{table}"
headers = {
"apikey": settings.SUPABASE_API_KEY,
"Authorization": f"Bearer {settings.SUPABASE_API_KEY}",
"Content-Type": "application/json",
"Content-Profile": "rawdata",
"Prefer": "resolution=merge-duplicates",
}
if on_conflict:
headers["Prefer"] += f",on_conflict={on_conflict}"
try:
with httpx.Client(timeout=30.0) as client:
response = client.post(url, json=data, headers=headers)
response.raise_for_status()
return {
"status": "success",
"status_code": response.status_code,
"data": response.json() if response.text else None,
}
except httpx.HTTPStatusError as e:
error_msg = f"Supabase API error: {e.response.status_code} - {e.response.text}"
logger.error(error_msg)
raise SupabaseAPIError(error_msg) from e
except httpx.RequestError as e:
error_msg = f"Supabase API request failed: {str(e)}"
logger.error(error_msg)
raise SupabaseAPIError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error calling Supabase API: {str(e)}"
logger.error(error_msg)
raise SupabaseAPIError(error_msg) from e