# Airflow Integration Guide คู่มือการตั้งค่าและใช้งาน Airflow Integration สำหรับ Finance Excel Upload ## 📋 Overview เมื่อ upload ไฟล์ Excel ผ่านหน้า `/data-management/finance` ระบบจะ: 1. บันทึกไฟล์ลง `/data/uploads/` 2. สร้าง record ใน database (`fastapi.upload_history`) 3. **Trigger Airflow DAG** `process_finance_excel` (พร้อม retry 3 ครั้ง, รอ 10 วิ) 4. Airflow จะ process ไฟล์และอัพเดท status กลับมา --- ## 🔧 Configuration ### 1. สร้าง Airflow API Token ```bash # เข้า Airflow container docker exec -it airflow-webserver bash # สร้าง API token airflow users create \ --username apiservice \ --firstname API \ --lastname Service \ --role Admin \ --email apiservice@sriphat.com \ --password your-secure-password # หรือใช้ CLI สร้าง token python -c "from airflow.api.auth.backend.basic_auth import BasicAuth; print(BasicAuth().get_token('apiservice', 'your-secure-password'))" ``` **หรือใช้ Airflow UI:** 1. เข้า `http://ai.sriphat.com/airflow` 2. ไปที่ **Security** → **List Users** 3. เลือก user → **Edit** 4. Copy **API Token** (หรือสร้างใหม่) ### 2. อัพเดท Environment Variables แก้ไข `.env` ใน `03-apiservice/`: ```bash # Airflow Integration AIRFLOW_API_URL=http://airflow-webserver:8080 AIRFLOW_API_TOKEN=your-airflow-api-token-here AIRFLOW_DAG_ID_FINANCE=process_finance_excel ``` ### 3. Restart API Service ```bash cd 03-apiservice docker compose restart ``` --- ## 📊 Database Schema Updates ### New Fields in `fastapi.upload_history`: | Field | Type | Description | |-------|------|-------------| | `airflow_dag_run_id` | String | DAG run ID จาก Airflow | | `airflow_state` | String | State: queued, running, success, failed | | `processing_started_at` | DateTime | เวลาเริ่ม process | | `processing_completed_at` | DateTime | เวลาเสร็จ | | `error_message` | Text | Error message (ถ้ามี) | ### Migration ```bash # Restart apiservice เพื่อสร้าง columns ใหม่ cd 03-apiservice docker compose down docker compose up -d ``` --- ## 🚀 How It Works ### Upload Flow: ``` 1. User uploads Excel file ↓ 2. API saves file to /data/uploads/ ↓ 3. Create UploadHistory record (status: pending) ↓ 4. Trigger Airflow DAG (with retry logic) ├─ Attempt 1: Try trigger ├─ Wait 10 seconds ├─ Attempt 2: Retry ├─ Wait 10 seconds └─ Attempt 3: Final retry ↓ 5. Update record: - Success: status=processing, airflow_dag_run_id=xxx - Failure: status=error, error_message=xxx ↓ 6. Return response to frontend ``` ### Airflow DAG Flow: ``` validate_file ↓ process_data ↓ load_to_database ↓ update_status ``` --- ## 📝 API Endpoints ### Upload File (Triggers Airflow) ```http POST /data-management/finance/upload Content-Type: multipart/form-data file: [Excel file] description: "Optional description" ``` **Response:** ```json { "success": true, "message": "File 'finance.xlsx' uploaded successfully", "upload_id": "upload_20260313_173000", "filename": "20260313_173000_finance.xlsx", "airflow_triggered": true, "dag_run_id": "manual__2026-03-13T10:30:00+00:00", "error": null } ``` ### Get Upload History ```http GET /data-management/finance/uploads ``` **Response:** ```json [ { "id": "upload_20260313_173000", "filename": "finance.xlsx", "filepath": "/data/uploads/20260313_173000_finance.xlsx", "uploaded_at": "2026-03-13T17:30:00", "description": "Q1 2026 Finance Data", "status": "processing", "uploaded_by": "admin", "airflow_dag_run_id": "manual__2026-03-13T10:30:00+00:00", "airflow_state": "running", "processing_started_at": "2026-03-13T17:30:05", "processing_completed_at": null, "error_message": null } ] ``` --- ## 🔍 Monitoring ### 1. Check Upload Status ```bash # ดู logs ของ API Service docker logs apiservice -f | grep "Airflow" # ตัวอย่าง logs: # Triggering Airflow DAG (attempt 1/3) # Airflow DAG triggered successfully: manual__2026-03-13T10:30:00+00:00 ``` ### 2. Check Airflow DAG 1. เข้า Airflow UI: `http://ai.sriphat.com/airflow` 2. ไปที่ **DAGs** → `process_finance_excel` 3. ดู **DAG Runs** และ **Task Instances** ### 3. Check Database ```sql -- ดู upload history พร้อม Airflow status SELECT upload_id, filename, status, airflow_dag_run_id, airflow_state, processing_started_at, processing_completed_at, error_message FROM fastapi.upload_history ORDER BY uploaded_at DESC LIMIT 10; ``` --- ## 🐛 Troubleshooting ### Issue: "Failed to trigger Airflow after 3 attempts" **สาเหตุ:** - Airflow API Token ไม่ถูกต้อง - Airflow webserver ไม่ทำงาน - Network connection ล้มเหลว **วิธีแก้:** ```bash # 1. ตรวจสอบ Airflow webserver docker ps | grep airflow-webserver # 2. ตรวจสอบ network docker network inspect shared_data_network # 3. ทดสอบ API token curl -X GET \ http://airflow-webserver:8080/api/v1/dags \ -H "Authorization: Bearer YOUR_TOKEN" # 4. ดู logs docker logs airflow-webserver -f docker logs apiservice -f ``` ### Issue: DAG ไม่ปรากฏใน Airflow UI **สาเหตุ:** - DAG file มี syntax error - DAG ถูก pause - Airflow scheduler ไม่ทำงาน **วิธีแก้:** ```bash # 1. ตรวจสอบ DAG file docker exec airflow-webserver airflow dags list | grep process_finance_excel # 2. Test DAG docker exec airflow-webserver airflow dags test process_finance_excel # 3. Unpause DAG docker exec airflow-webserver airflow dags unpause process_finance_excel # 4. ตรวจสอบ scheduler docker logs airflow-scheduler -f ``` ### Issue: DAG triggered แต่ไม่ทำงาน **สาเหตุ:** - Volume mount ไม่ถูกต้อง (Airflow เข้าถึง `/data/uploads` ไม่ได้) - Dependencies ขาดหาย (pandas, openpyxl) **วิธีแก้:** ```bash # 1. ตรวจสอบ volume docker exec airflow-worker ls -la /data/uploads # 2. ติดตั้ง dependencies docker exec airflow-worker pip install pandas openpyxl # หรือเพิ่มใน docker-compose.yaml: # _PIP_ADDITIONAL_REQUIREMENTS: "pandas openpyxl" ``` --- ## 🔐 Security Notes 1. **API Token Security:** - เก็บ token ใน `.env` (ไม่ commit ลง git) - ใช้ token ที่มี scope จำกัด (ไม่ใช่ admin token) - Rotate token เป็นระยะ 2. **File Access:** - Airflow ต้องมี read access ไปยัง `/data/uploads` - ตรวจสอบ file permissions 3. **Network:** - API Service และ Airflow ต้องอยู่ใน `shared_data_network` - ใช้ internal network (ไม่ expose Airflow API ออกภายนอก) --- ## 📚 References - [Airflow REST API Documentation](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html) - [Airflow Authentication](https://airflow.apache.org/docs/apache-airflow/stable/security/api.html) - [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/) --- ## 🎯 Next Steps 1. **Implement DAG Logic:** - แก้ไข `05-airflow/dags/process_finance_excel.py` - เพิ่ม data transformation logic - เพิ่ม database loading logic 2. **Add Callback:** - สร้าง endpoint `/api/v1/airflow/callback/{upload_id}` - Airflow จะ callback เมื่อเสร็จ/ล้มเหลว - อัพเดท `processing_completed_at` และ `airflow_state` 3. **Frontend Updates:** - เพิ่ม real-time status polling - แสดง Airflow state badges - Link ไปยัง Airflow DAG run 4. **Testing:** - ทดสอบ upload file - ตรวจสอบ Airflow trigger - ตรวจสอบ retry logic - ทดสอบ error handling