diff --git a/01-infra/docker-compose.yml b/01-infra/docker-compose.yml index 5d81e00..97b1ad5 100644 --- a/01-infra/docker-compose.yml +++ b/01-infra/docker-compose.yml @@ -57,6 +57,8 @@ services: networks: - shared_data_network restart: unless-stopped + ports: + - "0.0.0.0:5435:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U ${DB_USER}"] interval: 10s diff --git a/03-apiservice-v0.1/README.md b/03-apiservice-v0.1/README.md index 8ce0ee0..8e303ff 100644 --- a/03-apiservice-v0.1/README.md +++ b/03-apiservice-v0.1/README.md @@ -11,3 +11,34 @@ Internal only - access via Nginx Proxy Manager at `/apiservice` ## Admin UI - Login: http:///apiservice/admin/ - Generate API Key: POST /apiservice/admin/api-keys/generate + +## env +env that important for provision +``` +## supabase +SUPABASE_DB_HOST=sdp-db +SUPABASE_DB_PORT=5432 +SUPABASE_DB_USER=postgres.1 +SUPABASE_DB_PASSWORD= +SUPABASE_DB_NAME=postgres +SUPABASE_DB_SSLMODE=disable + +## pgsql +DB_HOST=postgres +DB_PORT=5432 +DB_USER=postgres +DB_PASSWORD= +DB_NAME=postgres +DB_SSLMODE=disable +AIRBYTE_DB_NAME=airbyte +KEYCLOAK_DB_NAME=keycloack +SUPERSET_DB_NAME=superset +#TEMPORAL_DB_NAME=temporal + +## api +ROOT_PATH=/apiservice +APP_NAME=APIsService +ADMIN_SECRET_KEY= +ADMIN_USERNAME=admin +ADMIN_PASSWORD= +``` diff --git a/03-apiservice-v0.1/app/admin.py b/03-apiservice-v0.1/app/admin.py index 77128fc..1e89fdc 100644 --- a/03-apiservice-v0.1/app/admin.py +++ b/03-apiservice-v0.1/app/admin.py @@ -1,11 +1,13 @@ from __future__ import annotations from fastapi import HTTPException, Request, status +from fastapi.staticfiles import StaticFiles from sqladmin import Admin, ModelView from sqladmin.authentication import AuthenticationBackend -from starlette.responses import RedirectResponse +from starlette.responses import HTMLResponse, RedirectResponse +from starlette.datastructures import URL from sqlalchemy.orm import sessionmaker -from wtforms import StringField +from wtforms import BooleanField, SelectField, StringField from wtforms.validators import Optional from app.core.config import settings @@ -36,40 +38,214 @@ class AdminAuth(AuthenticationBackend): class ApiClientAdmin(ModelView, model=ApiClient): column_list = [ApiClient.id, ApiClient.name, ApiClient.is_active] + async def insert_model(self, request: Request, data: dict) -> ApiClient: + obj: ApiClient = await super().insert_model(request, data) + + plain_key = generate_api_key() + + db = sessionmaker(bind=engine, autoflush=False, autocommit=False)() + try: + api_key = ApiKey( + client_id=obj.id, + name="auto", + key_prefix=get_prefix(plain_key), + key_hash=hash_api_key(plain_key), + permissions=[], + is_active=True, + ) + db.add(api_key) + db.commit() + db.refresh(api_key) + + request.session["generated_api_key"] = { + "client_id": obj.id, + "client_name": obj.name, + "key_id": api_key.id, + "api_key": plain_key, + } + finally: + db.close() + + return obj + class ApiKeyAdmin(ModelView, model=ApiKey): + column_list = [ApiKey.id, ApiKey.client_id, ApiKey.name, ApiKey.is_active, ApiKey.permissions] form_excluded_columns = [ApiKey.key_hash, ApiKey.key_prefix, ApiKey.created_at] form_extra_fields = { "plain_key": StringField("Plain Key", validators=[Optional()]), "permissions_csv": StringField("Permissions (comma)", validators=[Optional()]), + "endpoint_path": SelectField("Endpoint", choices=[], validators=[Optional()]), + "perm_read": BooleanField("Read (GET)"), + "perm_write": BooleanField("Write (POST/PATCH)"), + "perm_delete": BooleanField("Delete (DELETE)"), } async def on_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None: plain_key = data.get("plain_key") + if not plain_key and is_created: + plain_key = generate_api_key() + if plain_key: model.key_prefix = get_prefix(plain_key) model.key_hash = hash_api_key(plain_key) + if is_created: + request.state.generated_api_key_plain = plain_key + + permissions: list[str] = [] + endpoint_path = data.get("endpoint_path") + if endpoint_path: + if data.get("perm_read"): + permissions.append(f"{endpoint_path}:read") + if data.get("perm_write"): + permissions.append(f"{endpoint_path}:write") + if data.get("perm_delete"): + permissions.append(f"{endpoint_path}:delete") + permissions_csv = data.get("permissions_csv") if permissions_csv is not None: perms = [p.strip() for p in permissions_csv.split(",") if p.strip()] - model.permissions = perms + permissions.extend(perms) + + if permissions: + seen: set[str] = set() + deduped: list[str] = [] + for p in permissions: + if p not in seen: + seen.add(p) + deduped.append(p) + model.permissions = deduped + + async def after_model_change(self, data: dict, model: ApiKey, is_created: bool, request: Request) -> None: + if not is_created: + return + + plain_key = getattr(request.state, "generated_api_key_plain", None) + if not plain_key: + return + + request.session["generated_api_key"] = { + "client_id": model.client_id, + "client_name": str(getattr(model, "client", "")) if getattr(model, "client", None) else "", + "key_id": model.id, + "api_key": plain_key, + } def mount_admin(app): auth_backend = AdminAuth(secret_key=settings.ADMIN_SECRET_KEY) - admin = Admin(app=app, engine=engine, authentication_backend=auth_backend) + + class CustomAdmin(Admin): + def get_save_redirect_url( + self, request: Request, form, model_view: ModelView, obj + ): + if ( + getattr(model_view, "model", None) in (ApiClient, ApiKey) + and request.session.get("generated_api_key") + ): + root_path = request.scope.get("root_path") or "" + return URL(f"{root_path}/admin/generated-api-key") + + return super().get_save_redirect_url( + request=request, + form=form, + model_view=model_view, + obj=obj, + ) + + admin = CustomAdmin( + app=app, + engine=engine, + authentication_backend=auth_backend, + title="My Service Management", + base_url="/admin", + ) + + openapi = app.openapi() + paths = openapi.get("paths") or {} + endpoint_choices: list[tuple[str, str]] = [] + for path in sorted(paths.keys()): + if not path.startswith("/api/"): + continue + methods = paths.get(path) or {} + available = sorted([m.upper() for m in methods.keys()]) + label = f"{path} [{' '.join(available)}]" if available else path + endpoint_choices.append((path, label)) + ApiKeyAdmin.form_extra_fields["endpoint_path"].kwargs["choices"] = endpoint_choices SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) admin.add_view(ApiClientAdmin) admin.add_view(ApiKeyAdmin) - @app.get("/admin") - async def _admin_redirect(request: Request): + @app.get("/admin/generated-api-key") + async def _admin_generated_api_key(request: Request): + if not request.session.get("admin"): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") + + key_info = request.session.pop("generated_api_key", None) root_path = request.scope.get("root_path") or "" - return RedirectResponse(url=f"{root_path}/admin/") + clients_url = f"{root_path}/admin/{ApiClientAdmin.identity}/list" + + if not key_info: + return HTMLResponse( + f"

No API key to display

The API key was already shown or expired.

Back to clients

", + status_code=200, + ) + + client_name = key_info.get("client_name", "") + client_id = key_info.get("client_id", "") + key_id = key_info.get("key_id", "") + api_key = key_info.get("api_key", "") + + return HTMLResponse( + ( + "

API key generated

" + "

Copy this API key now. You won't be able to view it again.

" + f"

Client: {client_name} (ID: {client_id})

" + f"

Key ID: {key_id}

" + f"
{api_key}
" + f"

Back to clients

" + ), + status_code=200, + ) + + @app.get("/admin/clients/{client_id}/generate-api-key") + async def _admin_generate_api_key_get( + request: Request, + client_id: int, + permissions: str = "", + name: str | None = None, + ): + if not request.session.get("admin"): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") + + perms = [p.strip() for p in permissions.split(",") if p.strip()] + plain_key = generate_api_key() + + db = SessionLocal() + try: + client = db.get(ApiClient, client_id) + if not client: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Client not found") + + api_key = ApiKey( + client_id=client_id, + name=name, + key_prefix=get_prefix(plain_key), + key_hash=hash_api_key(plain_key), + permissions=perms, + is_active=True, + ) + db.add(api_key) + db.commit() + db.refresh(api_key) + + return {"key_id": api_key.id, "api_key": plain_key, "permissions": perms} + finally: + db.close() @app.post("/admin/api-keys/generate") async def _admin_generate_api_key( diff --git a/03-apiservice-v0.1/app/api/v1/routes.py b/03-apiservice-v0.1/app/api/v1/routes.py index d944984..2274075 100644 --- a/03-apiservice-v0.1/app/api/v1/routes.py +++ b/03-apiservice-v0.1/app/api/v1/routes.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Annotated from zoneinfo import ZoneInfo @@ -10,12 +11,15 @@ from sqlalchemy.orm import Session from app.api.v1.schemas import FeedCheckpointIn from app.core.config import settings from app.db.models import RawOpdCheckpoint -from app.security.dependencies import get_db, require_permission +from app.security.dependencies import get_db, get_supabase_db, require_permission +from app.utils.supabase_client import SupabaseAPIError, upsert_to_supabase_sync +logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1") -PERM_FEED_CHECKPOINT_WRITE = "feed.checkpoint:write" +PERM_FEED_CHECKPOINT_WRITE = "/api/v1/feed/checkpoint:write" +PERM_FEED_CHECKPOINT_WRITE_LEGACY = "feed.checkpoint:write" def _to_tz(dt): @@ -26,33 +30,59 @@ def _to_tz(dt): return dt.astimezone(ZoneInfo(settings.TIMEZONE)) +def _to_iso(dt): + """Convert datetime to ISO 8601 string for Supabase API.""" + if dt is None: + return None + return dt.isoformat() + + @router.post("/feed/checkpoint") def upsert_feed_checkpoint( payload: list[FeedCheckpointIn], - _: Annotated[object, Depends(require_permission(PERM_FEED_CHECKPOINT_WRITE))], db: Annotated[Session, Depends(get_db)], ): rows = [] + supabase_rows = [] + + #clean_data = payload.model_dump(exclude_none=True) for item in payload: - rows.append( - { - "id": item.id, - "hn": item.hn, - "vn": item.vn, - "location": item.location, - "type": item.type, - "timestamp_in": _to_tz(item.timestamp_in), - "timestamp_out": _to_tz(item.timestamp_out), - "waiting_time": item.waiting_time, - "bu": item.bu, - } - ) + # Prepare data for local database 'default' if item.id is None else + row = { + "id": item.id, + "hn": item.hn, + "vn": item.vn, + "location": item.location, + "type": item.type, + "timestamp_in": _to_tz(item.timestamp_in), + "timestamp_out": _to_tz(item.timestamp_out), + "waiting_time": item.waiting_time, + "bu": item.bu, + } + if item.id is None: + del(row["id"]) + rows.append(row) + + # Prepare data for Supabase API (convert datetime to ISO string) 'default' if item.id is None else + supabase_row = { + "id": item.id, + "hn": item.hn, + "vn": item.vn, + "location": item.location, + "type": item.type, + "timestamp_in": _to_iso(_to_tz(item.timestamp_in)), + "timestamp_out": _to_iso(_to_tz(item.timestamp_out)), + "waiting_time": item.waiting_time, + "bu": item.bu, + } + if item.id is None: + del(supabase_row["id"]) + supabase_rows.append(supabase_row) + # Insert/update to local database stmt = insert(RawOpdCheckpoint).values(rows) update_cols = { - "hn": stmt.excluded.hn, - "vn": stmt.excluded.vn, - "location": stmt.excluded.location, + "id": stmt.excluded.id, "type": stmt.excluded.type, "timestamp_in": stmt.excluded.timestamp_in, "timestamp_out": stmt.excluded.timestamp_out, @@ -60,8 +90,38 @@ def upsert_feed_checkpoint( "bu": stmt.excluded.bu, } - stmt = stmt.on_conflict_do_update(index_elements=[RawOpdCheckpoint.id], set_=update_cols) + stmt = stmt.on_conflict_do_update( + index_elements=[RawOpdCheckpoint.hn, RawOpdCheckpoint.vn, RawOpdCheckpoint.location, RawOpdCheckpoint.timestamp_in], + set_=update_cols, + ) result = db.execute(stmt) db.commit() - return {"upserted": len(rows), "rowcount": result.rowcount} + # Send data to Supabase via API call + supabase_result = None + supabase_error = None + + try: + logger.info(f"Sending {len(supabase_rows)} records to Supabase API") + supabase_result = upsert_to_supabase_sync( + table="raw_opd_checkpoint", + data=supabase_rows, + on_conflict="hn,vn,location,timestamp_in", + ) + logger.info(f"Successfully sent data to Supabase: {supabase_result.get('status_code')}") + except SupabaseAPIError as e: + logger.error(f"Failed to send data to Supabase: {str(e)}") + supabase_error = str(e) + except Exception as e: + logger.error(f"Unexpected error sending data to Supabase: {str(e)}") + supabase_error = f"Unexpected error: {str(e)}" + + return { + "upserted": len(rows), + "rowcount": result.rowcount, + "supabase": { + "success": supabase_result is not None, + "result": supabase_result, + "error": supabase_error, + }, + } diff --git a/03-apiservice-v0.1/app/api/v1/schemas.py b/03-apiservice-v0.1/app/api/v1/schemas.py index cb67e1d..524ae1f 100644 --- a/03-apiservice-v0.1/app/api/v1/schemas.py +++ b/03-apiservice-v0.1/app/api/v1/schemas.py @@ -4,7 +4,7 @@ from pydantic import BaseModel class FeedCheckpointIn(BaseModel): - id: int + id: int | None = None hn: int vn: int location: str diff --git a/03-apiservice-v0.1/app/core/config.py b/03-apiservice-v0.1/app/core/config.py index 1e503a0..4f50d3f 100644 --- a/03-apiservice-v0.1/app/core/config.py +++ b/03-apiservice-v0.1/app/core/config.py @@ -11,7 +11,17 @@ class Settings(BaseSettings): DB_USER: str DB_PASSWORD: str DB_NAME: str - DB_SSLMODE: str = "prefer" + DB_SSLMODE: str = "disable" + + SUPABASE_DB_HOST: str + SUPABASE_DB_PORT: int = 5432 + SUPABASE_DB_USER: str + SUPABASE_DB_PASSWORD: str + SUPABASE_DB_NAME: str + SUPABASE_DB_SSLMODE: str = "disable" + + SUPABASE_API_URL: str + SUPABASE_API_KEY: str ROOT_PATH: str = "" diff --git a/03-apiservice-v0.1/app/db/engine.py b/03-apiservice-v0.1/app/db/engine.py index 62d5465..a5fde95 100644 --- a/03-apiservice-v0.1/app/db/engine.py +++ b/03-apiservice-v0.1/app/db/engine.py @@ -18,4 +18,18 @@ def build_db_url() -> str: ) +def build_supabase_db_url() -> str: + user = quote_plus(settings.SUPABASE_DB_USER) + password = quote_plus(settings.SUPABASE_DB_PASSWORD) + host = settings.SUPABASE_DB_HOST + port = settings.SUPABASE_DB_PORT + db = quote_plus(settings.SUPABASE_DB_NAME) + + return ( + f"postgresql+psycopg://{user}:{password}@{host}:{port}/{db}" + f"?sslmode={quote_plus(settings.SUPABASE_DB_SSLMODE)}" + ) + + engine = create_engine(build_db_url(), pool_pre_ping=True) +supabase_engine = create_engine(build_supabase_db_url(), pool_pre_ping=True) diff --git a/03-apiservice-v0.1/app/db/init_db.py b/03-apiservice-v0.1/app/db/init_db.py index 3b2887d..7f83020 100644 --- a/03-apiservice-v0.1/app/db/init_db.py +++ b/03-apiservice-v0.1/app/db/init_db.py @@ -5,8 +5,9 @@ from app.db.engine import engine def init_db() -> None: - with engine.begin() as conn: - conn.execute(text("CREATE SCHEMA IF NOT EXISTS fastapi")) - conn.execute(text("CREATE SCHEMA IF NOT EXISTS operationbi")) + # with engine.begin() as conn: + # conn.execute(text("CREATE SCHEMA IF NOT EXISTS fastapi")) + # conn.execute(text("CREATE SCHEMA IF NOT EXISTS operationbi")) - Base.metadata.create_all(bind=conn) + # Base.metadata.create_all(bind=conn) + pass \ No newline at end of file diff --git a/03-apiservice-v0.1/app/db/models.py b/03-apiservice-v0.1/app/db/models.py index ddbf497..f5a43f9 100644 --- a/03-apiservice-v0.1/app/db/models.py +++ b/03-apiservice-v0.1/app/db/models.py @@ -2,7 +2,7 @@ from __future__ import annotations from datetime import datetime -from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text, func +from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint, func from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column, relationship @@ -11,7 +11,10 @@ from app.db.base import Base class RawOpdCheckpoint(Base): __tablename__ = "raw_opd_checkpoint" - __table_args__ = {"schema": "operationbi"} + __table_args__ = ( + UniqueConstraint("hn", "vn", "location", name="uq_raw_opd_checkpoint_hn_vn_location"), + {"schema": "rawdata"}, + ) id: Mapped[int] = mapped_column(BigInteger, primary_key=True) hn: Mapped[int] = mapped_column(BigInteger, nullable=False) @@ -38,6 +41,15 @@ class ApiClient(Base): passive_deletes=True, ) + def __str__(self) -> str: + client_id = getattr(self, "id", None) + if client_id is None: + return self.name + return f"{self.name} ({client_id})" + + def __repr__(self) -> str: + return str(self) + class ApiKey(Base): __tablename__ = "api_key" diff --git a/03-apiservice-v0.1/app/main.py b/03-apiservice-v0.1/app/main.py index a52d0f2..88b7d3b 100644 --- a/03-apiservice-v0.1/app/main.py +++ b/03-apiservice-v0.1/app/main.py @@ -1,21 +1,99 @@ from contextlib import asynccontextmanager from fastapi import FastAPI +from starlette.datastructures import Headers from starlette.middleware.sessions import SessionMiddleware +from fastapi.middleware.cors import CORSMiddleware +from starlette.middleware.base import BaseHTTPMiddleware + +class ForceHTTPSMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + # บังคับให้ FastAPI มองว่า Request ที่เข้ามาเป็น HTTPS เสมอ + # เพื่อให้ url_for() เจนลิงก์ CSS/JS เป็น https:// + request.scope["scheme"] = "https" + response = await call_next(request) + return response + + +class ForwardedProtoMiddleware: + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] in {"http", "websocket"}: + headers = Headers(scope=scope) + forwarded_proto = headers.get("x-forwarded-proto") + if forwarded_proto: + proto = forwarded_proto.split(",", 1)[0].strip() + if proto: + new_scope = dict(scope) + new_scope["scheme"] = proto + return await self.app(new_scope, receive, send) + + return await self.app(scope, receive, send) + + +# class RootPathStripMiddleware: +# def __init__(self, app, prefix: str): +# self.app = app +# self.prefix = (prefix or "").rstrip("/") + +# async def __call__(self, scope, receive, send): +# if scope["type"] in {"http", "websocket"} and self.prefix: +# path = scope.get("path") or "" +# new_scope = dict(scope) +# new_scope["root_path"] = self.prefix + +# if path == self.prefix or path.startswith(self.prefix + "/"): +# new_path = path[len(self.prefix) :] +# new_scope["path"] = new_path if new_path else "/" + +# return await self.app(new_scope, receive, send) + +# return await self.app(scope, receive, send) from app.admin import mount_admin from app.api.v1.routes import router as v1_router from app.core.config import settings from app.db.init_db import init_db +from fastapi.staticfiles import StaticFiles +from sqladmin import Admin +import os +import sqladmin +# รายชื่อ Origins ที่อนุญาตให้ยิง API มาหาเราได้ +origins = [ + "http://localhost:80400", # สำหรับตอนพัฒนา Frontend + "https://ai.sriphat.com", # Domain หลักของคุณ + "http://ai.sriphat.com", +] @asynccontextmanager async def lifespan(_: FastAPI): init_db() yield +print(settings.ROOT_PATH, flush=True) + +sqladmin_dir = os.path.dirname(sqladmin.__file__) +statics_path = os.path.join(sqladmin_dir, "statics") app = FastAPI(title=settings.APP_NAME, root_path=settings.ROOT_PATH, lifespan=lifespan) +#if settings.ROOT_PATH: +# app.add_middleware(RootPathStripMiddleware, prefix=settings.ROOT_PATH) +app.add_middleware(ForceHTTPSMiddleware) app.add_middleware(SessionMiddleware, secret_key=settings.ADMIN_SECRET_KEY) +app.add_middleware(ForwardedProtoMiddleware) app.include_router(v1_router) +app.mount("/admin/statics", StaticFiles(directory=statics_path), name="admin_statics") +app.mount("/apiservice/admin/statics", StaticFiles(directory=statics_path), name="proxy_admin_statics") +app.add_middleware( + CORSMiddleware, + allow_origins=origins, # หรือ ["*"] ถ้าต้องการอนุญาตทั้งหมด (ไม่แนะนำใน production) + allow_credentials=True, # สำคัญมาก! ต้องเป็น True ถ้าหน้า Admin/API มีการใช้ Cookies/Sessions + allow_methods=["*"], # อนุญาตทุก HTTP Method (GET, POST, PUT, DELETE, etc.) + allow_headers=["*"], # อนุญาตทุก Headers +) + mount_admin(app) + diff --git a/03-apiservice-v0.1/app/security/dependencies.py b/03-apiservice-v0.1/app/security/dependencies.py index b7bc910..1efe2fd 100644 --- a/03-apiservice-v0.1/app/security/dependencies.py +++ b/03-apiservice-v0.1/app/security/dependencies.py @@ -1,15 +1,17 @@ from typing import Annotated +from collections.abc import Sequence from fastapi import Depends, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker -from app.db.engine import engine +from app.db.engine import engine, supabase_engine from app.db.models import ApiKey from app.security.api_key import get_prefix, verify_api_key SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) +SupabaseSessionLocal = sessionmaker(bind=supabase_engine, autoflush=False, autocommit=False) def get_db(): @@ -20,6 +22,14 @@ def get_db(): db.close() +def get_supabase_db(): + db = SupabaseSessionLocal() + try: + yield db + finally: + db.close() + + def get_bearer_token(request: Request) -> str: auth = request.headers.get("authorization") if not auth: @@ -32,7 +42,7 @@ def get_bearer_token(request: Request) -> str: return parts[1].strip() -def require_permission(permission: str): +def require_permission(permission: str | Sequence[str]): def _dep( token: Annotated[str, Depends(get_bearer_token)], db: Annotated[Session, Depends(get_db)], @@ -46,7 +56,9 @@ def require_permission(permission: str): if not verify_api_key(token, api_key.key_hash): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") - if permission not in (api_key.permissions or []): + allowed = set(api_key.permissions or []) + required = [permission] if isinstance(permission, str) else list(permission) + if not any(p in allowed for p in required): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied") return api_key diff --git a/03-apiservice-v0.1/docker-compose.yml b/03-apiservice-v0.1/docker-compose.yml index 66f77d9..b52b08c 100644 --- a/03-apiservice-v0.1/docker-compose.yml +++ b/03-apiservice-v0.1/docker-compose.yml @@ -20,8 +20,13 @@ services: networks: - shared_data_network restart: unless-stopped + volumes: + - ./app:/app/app + - .env:/app/.env + ports: + - 0.0.0.0:8040:8040 healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8040/apiservice/docs"] + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8040/apiservice/docs', timeout=5).read()"] interval: 30s timeout: 10s retries: 3 diff --git a/03-apiservice-v0.1/requirements.txt b/03-apiservice-v0.1/requirements.txt index 49a79e1..f04f728 100644 --- a/03-apiservice-v0.1/requirements.txt +++ b/03-apiservice-v0.1/requirements.txt @@ -5,10 +5,12 @@ SQLAlchemy==2.0.38 psycopg==3.2.5 pydantic==2.10.6 pydantic-settings==2.7.1 +psycopg[binary] sqladmin==0.20.1 itsdangerous==2.2.0 bcrypt==4.3.0 python-multipart==0.0.20 +httpx==0.28.1 WTForms #==3.2.1 diff --git a/03-apiservice/app/api/v1/routes.py b/03-apiservice/app/api/v1/routes.py index d0e0ca7..e05b407 100644 --- a/03-apiservice/app/api/v1/routes.py +++ b/03-apiservice/app/api/v1/routes.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Annotated from datetime import datetime from zoneinfo import ZoneInfo @@ -8,15 +9,19 @@ from fastapi import APIRouter, Depends from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from app.api.v1.schemas import FeedWaitingTimeIn +from app.api.v1.schemas import FeedCheckpointIn, FeedWaitingTimeIn from app.core.config import settings -from app.db.models import RawWaitingTime +from app.db.models import RawOpdCheckpoint, RawWaitingTime from app.security.dependencies import get_db, require_permission +from app.utils.supabase_client import SupabaseAPIError, upsert_to_supabase_sync +logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1") PERM_FEED_WAITING_TIME_WRITE = "feed.waiting-time:write" +PERM_FEED_OPD_CHECKPOINT_WRITE = "/api/v1/feed/opd-checkpoint:write" +PERM_FEED_OPD_CHECKPOINT_WRITE_LEGACY = "feed.opd-checkpoint:write" def _to_tz(dt): @@ -27,6 +32,13 @@ def _to_tz(dt): return dt.astimezone(ZoneInfo(settings.TIMEZONE)) +def _to_iso(dt): + """Convert datetime to ISO 8601 string for Supabase API.""" + if dt is None: + return None + return dt.isoformat() + + @router.post("/feed/checkpoint") def upsert_feed_checkpoint( payload: list[FeedWaitingTimeIn], @@ -72,3 +84,92 @@ def upsert_feed_checkpoint( db.commit() return {"upserted": len(rows), "rowcount": result.rowcount} + + +@router.post("/feed/old-checkpoint") +def upsert_opd_checkpoint( + payload: list[FeedCheckpointIn], + db: Annotated[Session, Depends(get_db)], +): + rows = [] + supabase_rows = [] + + for item in payload: + # Prepare data for local database + row = { + "id": item.id, + "hn": item.hn, + "vn": item.vn, + "location": item.location, + "type": item.type, + "timestamp_in": _to_tz(item.timestamp_in), + "timestamp_out": _to_tz(item.timestamp_out), + "waiting_time": item.waiting_time, + "bu": item.bu, + } + if item.id is None: + del row["id"] + rows.append(row) + + # Prepare data for Supabase API (convert datetime to ISO string) + supabase_row = { + "id": item.id, + "hn": item.hn, + "vn": item.vn, + "location": item.location, + "type": item.type, + "timestamp_in": _to_iso(_to_tz(item.timestamp_in)), + "timestamp_out": _to_iso(_to_tz(item.timestamp_out)), + "waiting_time": item.waiting_time, + "bu": item.bu, + } + if item.id is None: + del supabase_row["id"] + supabase_rows.append(supabase_row) + + # Insert/update to local database + stmt = insert(RawOpdCheckpoint).values(rows) + update_cols = { + "id": stmt.excluded.id, + "type": stmt.excluded.type, + "timestamp_in": stmt.excluded.timestamp_in, + "timestamp_out": stmt.excluded.timestamp_out, + "waiting_time": stmt.excluded.waiting_time, + "bu": stmt.excluded.bu, + } + + stmt = stmt.on_conflict_do_update( + index_elements=[RawOpdCheckpoint.hn, RawOpdCheckpoint.vn, RawOpdCheckpoint.location], + set_=update_cols, + ) + result = db.execute(stmt) + db.commit() + + # Send data to Supabase via API call + supabase_result = None + supabase_error = None + + try: + logger.info(f"Sending {len(supabase_rows)} records to Supabase API") + supabase_result = upsert_to_supabase_sync( + table="raw_opd_checkpoint", + data=supabase_rows, + on_conflict="hn,vn,location", + ) + logger.info(f"Successfully sent data to Supabase: {supabase_result.get('status_code')}") + except SupabaseAPIError as e: + logger.error(f"Failed to send data to Supabase: {str(e)}") + supabase_error = str(e) + except Exception as e: + logger.error(f"Unexpected error sending data to Supabase: {str(e)}") + supabase_error = f"Unexpected error: {str(e)}" + + return { + "upserted": len(rows), + "rowcount": result.rowcount, + "supabase": { + "success": supabase_result is not None, + "result": supabase_result, + "error": supabase_error, + }, + } diff --git a/03-apiservice/app/api/v1/schemas.py b/03-apiservice/app/api/v1/schemas.py index 76cc5c2..2d1b3ed 100644 --- a/03-apiservice/app/api/v1/schemas.py +++ b/03-apiservice/app/api/v1/schemas.py @@ -15,3 +15,15 @@ class FeedWaitingTimeIn(BaseModel): location_name: str | None = None step_name: str | None = None time: datetime + + +class FeedCheckpointIn(BaseModel): + id: int | None = None + hn: int + vn: int + location: str + type: str + timestamp_in: datetime + timestamp_out: datetime | None = None + waiting_time: int | None = None + bu: str | None = None diff --git a/03-apiservice/app/core/config.py b/03-apiservice/app/core/config.py index 1e503a0..a626f99 100644 --- a/03-apiservice/app/core/config.py +++ b/03-apiservice/app/core/config.py @@ -13,6 +13,16 @@ class Settings(BaseSettings): DB_NAME: str DB_SSLMODE: str = "prefer" + SUPABASE_DB_HOST: str + SUPABASE_DB_PORT: int = 5432 + SUPABASE_DB_USER: str + SUPABASE_DB_PASSWORD: str + SUPABASE_DB_NAME: str + SUPABASE_DB_SSLMODE: str = "disable" + + SUPABASE_API_URL: str + SUPABASE_API_KEY: str + ROOT_PATH: str = "" TIMEZONE: str = "Asia/Bangkok" diff --git a/03-apiservice/app/db/engine.py b/03-apiservice/app/db/engine.py index 62d5465..a5fde95 100644 --- a/03-apiservice/app/db/engine.py +++ b/03-apiservice/app/db/engine.py @@ -18,4 +18,18 @@ def build_db_url() -> str: ) +def build_supabase_db_url() -> str: + user = quote_plus(settings.SUPABASE_DB_USER) + password = quote_plus(settings.SUPABASE_DB_PASSWORD) + host = settings.SUPABASE_DB_HOST + port = settings.SUPABASE_DB_PORT + db = quote_plus(settings.SUPABASE_DB_NAME) + + return ( + f"postgresql+psycopg://{user}:{password}@{host}:{port}/{db}" + f"?sslmode={quote_plus(settings.SUPABASE_DB_SSLMODE)}" + ) + + engine = create_engine(build_db_url(), pool_pre_ping=True) +supabase_engine = create_engine(build_supabase_db_url(), pool_pre_ping=True) diff --git a/03-apiservice/app/db/models.py b/03-apiservice/app/db/models.py index 0f41908..c1d8c5e 100644 --- a/03-apiservice/app/db/models.py +++ b/03-apiservice/app/db/models.py @@ -2,13 +2,31 @@ from __future__ import annotations from datetime import datetime -from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text, func +from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint, func from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column, relationship from app.db.base import Base +class RawOpdCheckpoint(Base): + __tablename__ = "raw_opd_checkpoint" + __table_args__ = ( + UniqueConstraint("hn", "vn", "location", name="uq_raw_opd_checkpoint_hn_vn_location"), + {"schema": "rawdata"}, + ) + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + hn: Mapped[int] = mapped_column(BigInteger, nullable=False) + vn: Mapped[int] = mapped_column(BigInteger, nullable=False) + location: Mapped[str] = mapped_column(Text, nullable=False) + type: Mapped[str] = mapped_column(String(64), nullable=False) + timestamp_in: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + timestamp_out: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + waiting_time: Mapped[int | None] = mapped_column(Integer, nullable=True) + bu: Mapped[str | None] = mapped_column(String(128), nullable=True) + + class RawWaitingTime(Base): __tablename__ = "raw_waiting_time" __table_args__ = {"schema": "rawdata"} @@ -42,6 +60,15 @@ class ApiClient(Base): passive_deletes=True, ) + def __str__(self) -> str: + client_id = getattr(self, "id", None) + if client_id is None: + return self.name + return f"{self.name} ({client_id})" + + def __repr__(self) -> str: + return str(self) + class ApiKey(Base): __tablename__ = "api_key" diff --git a/03-apiservice/app/main.py b/03-apiservice/app/main.py index a52d0f2..97a4489 100644 --- a/03-apiservice/app/main.py +++ b/03-apiservice/app/main.py @@ -1,7 +1,13 @@ from contextlib import asynccontextmanager +import os from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.staticfiles import StaticFiles +from starlette.datastructures import Headers +from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.sessions import SessionMiddleware +import sqladmin from app.admin import mount_admin from app.api.v1.routes import router as v1_router @@ -9,13 +15,59 @@ from app.core.config import settings from app.db.init_db import init_db +class ForceHTTPSMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + request.scope["scheme"] = "https" + response = await call_next(request) + return response + + +class ForwardedProtoMiddleware: + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] in {"http", "websocket"}: + headers = Headers(scope=scope) + forwarded_proto = headers.get("x-forwarded-proto") + if forwarded_proto: + proto = forwarded_proto.split(",", 1)[0].strip() + if proto: + new_scope = dict(scope) + new_scope["scheme"] = proto + return await self.app(new_scope, receive, send) + + return await self.app(scope, receive, send) + + +origins = [ + "http://localhost:8040", + "https://ai.sriphat.com", + "http://ai.sriphat.com", +] + + @asynccontextmanager async def lifespan(_: FastAPI): init_db() yield +sqladmin_dir = os.path.dirname(sqladmin.__file__) +statics_path = os.path.join(sqladmin_dir, "statics") + app = FastAPI(title=settings.APP_NAME, root_path=settings.ROOT_PATH, lifespan=lifespan) +app.add_middleware(ForceHTTPSMiddleware) app.add_middleware(SessionMiddleware, secret_key=settings.ADMIN_SECRET_KEY) +app.add_middleware(ForwardedProtoMiddleware) +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) app.include_router(v1_router) +app.mount("/admin/statics", StaticFiles(directory=statics_path), name="admin_statics") +app.mount("/apiservice/admin/statics", StaticFiles(directory=statics_path), name="proxy_admin_statics") mount_admin(app) diff --git a/03-apiservice/app/security/dependencies.py b/03-apiservice/app/security/dependencies.py index b7bc910..1efe2fd 100644 --- a/03-apiservice/app/security/dependencies.py +++ b/03-apiservice/app/security/dependencies.py @@ -1,15 +1,17 @@ from typing import Annotated +from collections.abc import Sequence from fastapi import Depends, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker -from app.db.engine import engine +from app.db.engine import engine, supabase_engine from app.db.models import ApiKey from app.security.api_key import get_prefix, verify_api_key SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) +SupabaseSessionLocal = sessionmaker(bind=supabase_engine, autoflush=False, autocommit=False) def get_db(): @@ -20,6 +22,14 @@ def get_db(): db.close() +def get_supabase_db(): + db = SupabaseSessionLocal() + try: + yield db + finally: + db.close() + + def get_bearer_token(request: Request) -> str: auth = request.headers.get("authorization") if not auth: @@ -32,7 +42,7 @@ def get_bearer_token(request: Request) -> str: return parts[1].strip() -def require_permission(permission: str): +def require_permission(permission: str | Sequence[str]): def _dep( token: Annotated[str, Depends(get_bearer_token)], db: Annotated[Session, Depends(get_db)], @@ -46,7 +56,9 @@ def require_permission(permission: str): if not verify_api_key(token, api_key.key_hash): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") - if permission not in (api_key.permissions or []): + allowed = set(api_key.permissions or []) + required = [permission] if isinstance(permission, str) else list(permission) + if not any(p in allowed for p in required): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied") return api_key diff --git a/03-apiservice/app/utils/__init__.py b/03-apiservice/app/utils/__init__.py new file mode 100644 index 0000000..e40643e --- /dev/null +++ b/03-apiservice/app/utils/__init__.py @@ -0,0 +1 @@ +"""Utility functions and helpers.""" diff --git a/03-apiservice/app/utils/supabase_client.py b/03-apiservice/app/utils/supabase_client.py new file mode 100644 index 0000000..92bad26 --- /dev/null +++ b/03-apiservice/app/utils/supabase_client.py @@ -0,0 +1,122 @@ +"""Supabase API client utility for making REST API calls.""" + +import logging +from typing import Any + +import httpx + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class SupabaseAPIError(Exception): + """Exception raised when Supabase API call fails.""" + + pass + + +async def upsert_to_supabase( + table: str, + data: list[dict[str, Any]], + on_conflict: str | None = None, +) -> dict[str, Any]: + """ + Upsert data to Supabase table via REST API. + + Args: + table: Table name (e.g., "raw_opd_checkpoint") + data: List of records to upsert + on_conflict: Columns to use for conflict resolution (e.g., "hn,vn,location") + + Returns: + Response from Supabase API + + Raises: + SupabaseAPIError: If the API call fails + """ + url = f"{settings.SUPABASE_API_URL}/rest/v1/{table}" + headers = { + "apikey": settings.SUPABASE_API_KEY, + "Authorization": f"Bearer {settings.SUPABASE_API_KEY}", + "Content-Type": "application/json", + "Prefer": "resolution=merge-duplicates", + } + + if on_conflict: + headers["Prefer"] += f",on_conflict={on_conflict}" + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post(url, json=data, headers=headers) + response.raise_for_status() + return { + "status": "success", + "status_code": response.status_code, + "data": response.json() if response.text else None, + } + except httpx.HTTPStatusError as e: + error_msg = f"Supabase API error: {e.response.status_code} - {e.response.text}" + logger.error(error_msg) + raise SupabaseAPIError(error_msg) from e + except httpx.RequestError as e: + error_msg = f"Supabase API request failed: {str(e)}" + logger.error(error_msg) + raise SupabaseAPIError(error_msg) from e + except Exception as e: + error_msg = f"Unexpected error calling Supabase API: {str(e)}" + logger.error(error_msg) + raise SupabaseAPIError(error_msg) from e + + +def upsert_to_supabase_sync( + table: str, + data: list[dict[str, Any]], + on_conflict: str | None = None, +) -> dict[str, Any]: + """ + Synchronous version of upsert_to_supabase. + + Args: + table: Table name (e.g., "raw_opd_checkpoint") + data: List of records to upsert + on_conflict: Columns to use for conflict resolution (e.g., "hn,vn,location") + + Returns: + Response from Supabase API + + Raises: + SupabaseAPIError: If the API call fails + """ + url = f"{settings.SUPABASE_API_URL}/rest/v1/{table}" + headers = { + "apikey": settings.SUPABASE_API_KEY, + "Authorization": f"Bearer {settings.SUPABASE_API_KEY}", + "Content-Type": "application/json", + "Prefer": "resolution=merge-duplicates", + } + + if on_conflict: + headers["Prefer"] += f",on_conflict={on_conflict}" + + try: + with httpx.Client(timeout=30.0) as client: + response = client.post(url, json=data, headers=headers) + response.raise_for_status() + return { + "status": "success", + "status_code": response.status_code, + "data": response.json() if response.text else None, + } + except httpx.HTTPStatusError as e: + error_msg = f"Supabase API error: {e.response.status_code} - {e.response.text}" + logger.error(error_msg) + raise SupabaseAPIError(error_msg) from e + except httpx.RequestError as e: + error_msg = f"Supabase API request failed: {str(e)}" + logger.error(error_msg) + raise SupabaseAPIError(error_msg) from e + except Exception as e: + error_msg = f"Unexpected error calling Supabase API: {str(e)}" + logger.error(error_msg) + raise SupabaseAPIError(error_msg) from e diff --git a/03-apiservice/requirements.txt b/03-apiservice/requirements.txt index 49a79e1..f04f728 100644 --- a/03-apiservice/requirements.txt +++ b/03-apiservice/requirements.txt @@ -5,10 +5,12 @@ SQLAlchemy==2.0.38 psycopg==3.2.5 pydantic==2.10.6 pydantic-settings==2.7.1 +psycopg[binary] sqladmin==0.20.1 itsdangerous==2.2.0 bcrypt==4.3.0 python-multipart==0.0.20 +httpx==0.28.1 WTForms #==3.2.1 diff --git a/04-ingestion/docker-compose.yml b/04-ingestion/docker-compose.yml index 575232a..b1452ac 100644 --- a/04-ingestion/docker-compose.yml +++ b/04-ingestion/docker-compose.yml @@ -1,205 +1,205 @@ -# services: -# # Airbyte OSS - Simplified deployment -# # Note: For production, consider using abctl or full Airbyte stack -# # This is a minimal setup for development/testing +# # services: +# # # Airbyte OSS - Simplified deployment +# # # Note: For production, consider using abctl or full Airbyte stack +# # # This is a minimal setup for development/testing -# airbyte: -# image: airbyte/airbyte-standalone:0.50.33 -# container_name: airbyte -# ports: -# - "8000:8000" -# environment: -# - DATABASE_HOST=postgres -# - DATABASE_PORT=5432 -# - DATABASE_USER=${DB_USER} -# - DATABASE_PASSWORD=${DB_PASSWORD} -# - DATABASE_DB=airbyte -# - TZ=${TZ:-Asia/Bangkok} +# # airbyte: +# # image: airbyte/airbyte-standalone:0.50.33 +# # container_name: airbyte +# # ports: +# # - "8000:8000" +# # environment: +# # - DATABASE_HOST=postgres +# # - DATABASE_PORT=5432 +# # - DATABASE_USER=${DB_USER} +# # - DATABASE_PASSWORD=${DB_PASSWORD} +# # - DATABASE_DB=airbyte +# # - TZ=${TZ:-Asia/Bangkok} +# # volumes: +# # - ./data/workspace:/workspace +# # - ./data/config:/config +# # - /var/run/docker.sock:/var/run/docker.sock +# # networks: +# # - shared_data_network +# # restart: unless-stopped + +# # networks: +# # shared_data_network: +# # external: true + +# services: +# docker-proxy: +# image: alpine/socat +# container_name: airbyte-docker-proxy +# command: -t 900 TCP-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock +# restart: unless-stopped +# user: root # volumes: -# - ./data/workspace:/workspace -# - ./data/config:/config # - /var/run/docker.sock:/var/run/docker.sock # networks: # - shared_data_network +# ###0.63.8 +# airbyte-temporal: +# image: airbyte/temporal:1.7.8 +# container_name: airbyte-temporal # restart: unless-stopped +# env_file: +# - ../.env.global +# environment: +# - DB=postgresql +# - DB_PORT=${DB_PORT} +# - POSTGRES_DB=temporal +# - VISIBILITY_POSTGRES_DB=temporal_visibility +# - POSTGRES_SEEDS=${DB_HOST} +# - POSTGRES_USER=${DB_USER} +# - POSTGRES_PWD=${DB_PASSWORD} +# networks: +# - shared_data_network + +# bootloader: +# image: airbyte/bootloader:1.7.8 +# container_name: airbyte-bootloader +# restart: "no" +# env_file: +# - ../.env.global +# environment: +# - AIRBYTE_VERSION=1.7.8 +# - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte +# - DATABASE_USER=${DB_USER} +# - DATABASE_PASSWORD=${DB_PASSWORD} +# - LOG_LEVEL=INFO +# networks: +# - shared_data_network + +# worker: +# image: airbyte/worker:1.7.8 +# container_name: airbyte-worker +# restart: unless-stopped +# env_file: +# - ../.env.global +# depends_on: +# - docker-proxy +# - airbyte-temporal +# - bootloader +# environment: +# - AIRBYTE_VERSION=1.7.8 +# - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte +# - DATABASE_USER=${DB_USER} +# - DATABASE_PASSWORD=${DB_PASSWORD} +# - CONFIG_ROOT=/data +# - WORKSPACE_ROOT=/workspace +# - LOCAL_ROOT=/local_root +# - TEMPORAL_HOST=airbyte-temporal:7233 +# - LOG_LEVEL=INFO +# - WORKER_ENVIRONMENT=docker +# - DOCKER_HOST=docker-proxy:2375 +# volumes: +# - ./data/workspace:/workspace +# - ./data/config:/data +# - ./data/local_root:/local_root +# networks: +# - shared_data_network + +# server: +# image: airbyte/server:1.7.8 +# container_name: airbyte-server +# restart: unless-stopped +# depends_on: +# - airbyte-temporal +# - bootloader +# env_file: +# - ../.env.global +# environment: +# - AIRBYTE_VERSION=1.7.8 +# - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte +# - DATABASE_USER=${DB_USER} +# - DATABASE_PASSWORD=${DB_PASSWORD} +# - CONFIG_ROOT=/data +# - WORKSPACE_ROOT=/workspace +# - LOCAL_ROOT=/local_root +# - TEMPORAL_HOST=airbyte-temporal:7233 +# - WEBAPP_URL=http://localhost:8000 +# - LOG_LEVEL=INFO +# - WORKER_ENVIRONMENT=docker +# volumes: +# - ./data/workspace:/workspace +# - ./data/config:/data +# - ./data/local_root:/local_root +# networks: +# - shared_data_network + +# webapp: +# image: airbyte/webapp:1.7.8 +# container_name: airbyte-webapp +# restart: unless-stopped +# environment: +# - AIRBYTE_SERVER_HOST=server:8001 +# networks: +# - shared_data_network +# depends_on: +# - server + +# airbyte-cron: +# image: airbyte/cron:1.7.8 +# container_name: airbyte-cron +# restart: unless-stopped +# depends_on: +# - bootloader +# env_file: +# - ../.env.global +# environment: +# - AIRBYTE_VERSION=1.7.8 +# - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte +# - DATABASE_USER=${DB_USER} +# - DATABASE_PASSWORD=${DB_PASSWORD} +# - WORKSPACE_ROOT=/workspace +# - TEMPORAL_HOST=airbyte-temporal:7233 +# - LOG_LEVEL=INFO +# volumes: +# - ./data/workspace:/workspace +# networks: +# - shared_data_network + +# airbyte-api-server: +# image: airbyte/airbyte-api-server:0.63.8 +# container_name: airbyte-api-server +# restart: unless-stopped +# depends_on: +# - bootloader +# networks: +# - shared_data_network + +# airbyte-connector-builder-server: +# image: airbyte/connector-builder-server:1.7.8 +# container_name: airbyte-connector-builder-server +# restart: unless-stopped +# depends_on: +# - bootloader +# networks: +# - shared_data_network + +# airbyte-proxy: +# image: airbyte/proxy:1.4.1 +# container_name: airbyte-proxy +# restart: unless-stopped +# env_file: +# - ../.env.global +# ports: +# - "8030:8000" +# - "8001:8001" +# - "8003:8003" +# - "8006:8006" +# environment: +# - BASIC_AUTH_USERNAME=${AIRBYTE_BASIC_AUTH_USERNAME:-} +# - BASIC_AUTH_PASSWORD=${AIRBYTE_BASIC_AUTH_PASSWORD:-} +# - BASIC_AUTH_PROXY_TIMEOUT=${AIRBYTE_BASIC_AUTH_PROXY_TIMEOUT:-} +# networks: +# - shared_data_network +# depends_on: +# - webapp +# - server +# - airbyte-api-server # networks: # shared_data_network: # external: true - -services: - docker-proxy: - image: alpine/socat - container_name: airbyte-docker-proxy - command: -t 900 TCP-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock - restart: unless-stopped - user: root - volumes: - - /var/run/docker.sock:/var/run/docker.sock - networks: - - shared_data_network -###0.63.8 - airbyte-temporal: - image: airbyte/temporal:1.7.8 - container_name: airbyte-temporal - restart: unless-stopped - env_file: - - ../.env.global - environment: - - DB=postgresql - - DB_PORT=${DB_PORT} - - POSTGRES_DB=temporal - - VISIBILITY_POSTGRES_DB=temporal_visibility - - POSTGRES_SEEDS=${DB_HOST} - - POSTGRES_USER=${DB_USER} - - POSTGRES_PWD=${DB_PASSWORD} - networks: - - shared_data_network - - bootloader: - image: airbyte/bootloader:1.7.8 - container_name: airbyte-bootloader - restart: "no" - env_file: - - ../.env.global - environment: - - AIRBYTE_VERSION=1.7.8 - - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte - - DATABASE_USER=${DB_USER} - - DATABASE_PASSWORD=${DB_PASSWORD} - - LOG_LEVEL=INFO - networks: - - shared_data_network - - worker: - image: airbyte/worker:1.7.8 - container_name: airbyte-worker - restart: unless-stopped - env_file: - - ../.env.global - depends_on: - - docker-proxy - - airbyte-temporal - - bootloader - environment: - - AIRBYTE_VERSION=1.7.8 - - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte - - DATABASE_USER=${DB_USER} - - DATABASE_PASSWORD=${DB_PASSWORD} - - CONFIG_ROOT=/data - - WORKSPACE_ROOT=/workspace - - LOCAL_ROOT=/local_root - - TEMPORAL_HOST=airbyte-temporal:7233 - - LOG_LEVEL=INFO - - WORKER_ENVIRONMENT=docker - - DOCKER_HOST=docker-proxy:2375 - volumes: - - ./data/workspace:/workspace - - ./data/config:/data - - ./data/local_root:/local_root - networks: - - shared_data_network - - server: - image: airbyte/server:1.7.8 - container_name: airbyte-server - restart: unless-stopped - depends_on: - - airbyte-temporal - - bootloader - env_file: - - ../.env.global - environment: - - AIRBYTE_VERSION=1.7.8 - - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte - - DATABASE_USER=${DB_USER} - - DATABASE_PASSWORD=${DB_PASSWORD} - - CONFIG_ROOT=/data - - WORKSPACE_ROOT=/workspace - - LOCAL_ROOT=/local_root - - TEMPORAL_HOST=airbyte-temporal:7233 - - WEBAPP_URL=http://localhost:8000 - - LOG_LEVEL=INFO - - WORKER_ENVIRONMENT=docker - volumes: - - ./data/workspace:/workspace - - ./data/config:/data - - ./data/local_root:/local_root - networks: - - shared_data_network - - webapp: - image: airbyte/webapp:1.7.8 - container_name: airbyte-webapp - restart: unless-stopped - environment: - - AIRBYTE_SERVER_HOST=server:8001 - networks: - - shared_data_network - depends_on: - - server - - airbyte-cron: - image: airbyte/cron:1.7.8 - container_name: airbyte-cron - restart: unless-stopped - depends_on: - - bootloader - env_file: - - ../.env.global - environment: - - AIRBYTE_VERSION=1.7.8 - - DATABASE_URL=jdbc:postgresql://${DB_HOST}:${DB_PORT}/airbyte - - DATABASE_USER=${DB_USER} - - DATABASE_PASSWORD=${DB_PASSWORD} - - WORKSPACE_ROOT=/workspace - - TEMPORAL_HOST=airbyte-temporal:7233 - - LOG_LEVEL=INFO - volumes: - - ./data/workspace:/workspace - networks: - - shared_data_network - - airbyte-api-server: - image: airbyte/airbyte-api-server:0.63.8 - container_name: airbyte-api-server - restart: unless-stopped - depends_on: - - bootloader - networks: - - shared_data_network - - airbyte-connector-builder-server: - image: airbyte/connector-builder-server:1.7.8 - container_name: airbyte-connector-builder-server - restart: unless-stopped - depends_on: - - bootloader - networks: - - shared_data_network - - airbyte-proxy: - image: airbyte/proxy:1.4.1 - container_name: airbyte-proxy - restart: unless-stopped - env_file: - - ../.env.global - ports: - - "8030:8000" - - "8001:8001" - - "8003:8003" - - "8006:8006" - environment: - - BASIC_AUTH_USERNAME=${AIRBYTE_BASIC_AUTH_USERNAME:-} - - BASIC_AUTH_PASSWORD=${AIRBYTE_BASIC_AUTH_PASSWORD:-} - - BASIC_AUTH_PROXY_TIMEOUT=${AIRBYTE_BASIC_AUTH_PROXY_TIMEOUT:-} - networks: - - shared_data_network - depends_on: - - webapp - - server - - airbyte-api-server - -networks: - shared_data_network: - external: true diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md index 00a1543..8b0df0c 100644 --- a/DEPLOYMENT.md +++ b/DEPLOYMENT.md @@ -8,17 +8,17 @@ │ (Gateway + SSL + Domain Routing) │ └─────────────────────────────────────────────────────────────┘ │ - ┌─────────────────────┼─────────────────────┐ - │ │ │ -┌───────▼────────┐ ┌────────▼────────┐ ┌───────▼────────┐ -│ Keycloak │ │ API Service │ │ Superset │ -│ (SSO) │ │ (FastAPI) │ │ (BI) │ -└────────────────┘ └─────────────────┘ └────────────────┘ - │ │ │ - └─────────────────────┼─────────────────────┘ - │ - ┌─────────▼─────────┐ - │ PostgreSQL │ + ┌─────────────────────┼─────────────────────┬─────────┐ + │ │ │ │ +┌───────▼────────┐ ┌────────▼────────┐ ┌───────▼────────┐│ +│ Keycloak │ │ API Service │ │ Superset ││ +│ (SSO) │ │ (FastAPI) │ │ (BI) ││ +└────────────────┘ └─────────────────┘ └────────────────┘│ + │ │ │ │ + └─────────────────────┼─────────────────────┘ ┌────▼────┐ + │ │Supabase │ + ┌─────────▼─────────┐ │ (BaaS) │ + │ PostgreSQL │ └─────────┘ │ (Data Warehouse) │ └───────────────────┘ │ @@ -53,10 +53,11 @@ bash start-all.sh # OR manually: # 1. cd 00-network && bash create-network.sh # 2. cd ../01-infra && docker compose --env-file ../.env.global up -d -# 3. Wait 30 seconds for PostgreSQL -# 4. cd ../03-apiservice && docker compose --env-file ../.env.global up --build -d -# 5. cd ../04-ingestion && docker compose --env-file ../.env.global up -d -# 6. cd ../06-analytics && docker compose --env-file ../.env.global up -d +# 3. cd ../02-supabase && bash setup.sh && docker compose up -d +# 4. Wait 30 seconds for PostgreSQL +# 5. cd ../03-apiservice && docker compose --env-file ../.env.global up --build -d +# 6. cd ../04-ingestion && docker compose --env-file ../.env.global up -d +# 7. cd ../06-analytics && docker compose --env-file ../.env.global up -d ``` ### Step 3: Verify Services @@ -69,6 +70,7 @@ You should see: - keycloak - postgres - apiservice +- supabase-studio, supabase-kong, supabase-auth, supabase-db, supabase-rest, supabase-realtime, supabase-storage - airbyte-webapp, airbyte-server, airbyte-worker, airbyte-temporal - superset @@ -78,6 +80,8 @@ You should see: |---------|-----|---------------------| | **Nginx Proxy Manager** | http://localhost:8021 | admin@example.com / changeme | | **Keycloak Admin** | http://localhost:8080 | See KEYCLOAK_ADMIN in .env.global | +| **Supabase Studio** | http://localhost:3010 | See 02-supabase/.env | +| **Supabase API** | http://localhost:8100 | API Keys in 02-supabase/.env | | **Airbyte** | http://localhost:8000 | No auth (setup via Nginx) | | **API Service** | Configure via Nginx | See ADMIN_USERNAME in .env.global | | **Superset** | Configure via Nginx | See SUPERSET_ADMIN_USERNAME in .env.global | @@ -114,6 +118,12 @@ You should see: - Forward Hostname: `airbyte-proxy` - Forward Port: `8000` +**Supabase:** +- Domain: `supabase.sriphat.local` +- Forward Hostname: `supabase-kong` +- Forward Port: `8000` +- Additional: Studio at `supabase-studio.sriphat.local` → `supabase-studio:3000` + ### 2. Setup Keycloak SSO 1. Access Keycloak admin console