Files
sriphat-dataplatform/03-apiservice/AIRFLOW_INTEGRATION.md

335 lines
8.3 KiB
Markdown

# Airflow Integration Guide
คู่มือการตั้งค่าและใช้งาน Airflow Integration สำหรับ Finance Excel Upload
## 📋 Overview
เมื่อ upload ไฟล์ Excel ผ่านหน้า `/data-management/finance` ระบบจะ:
1. บันทึกไฟล์ลง `/data/uploads/`
2. สร้าง record ใน database (`fastapi.upload_history`)
3. **Trigger Airflow DAG** `process_finance_excel` (พร้อม retry 3 ครั้ง, รอ 10 วิ)
4. Airflow จะ process ไฟล์และอัพเดท status กลับมา
---
## 🔧 Configuration
### 1. สร้าง Airflow API Token
```bash
# เข้า Airflow container
docker exec -it airflow-webserver bash
# สร้าง API token
airflow users create \
--username apiservice \
--firstname API \
--lastname Service \
--role Admin \
--email apiservice@sriphat.com \
--password your-secure-password
# หรือใช้ CLI สร้าง token
python -c "from airflow.api.auth.backend.basic_auth import BasicAuth; print(BasicAuth().get_token('apiservice', 'your-secure-password'))"
```
**หรือใช้ Airflow UI:**
1. เข้า `http://ai.sriphat.com/airflow`
2. ไปที่ **Security****List Users**
3. เลือก user → **Edit**
4. Copy **API Token** (หรือสร้างใหม่)
### 2. อัพเดท Environment Variables
แก้ไข `.env` ใน `03-apiservice/`:
```bash
# Airflow Integration
AIRFLOW_API_URL=http://airflow-webserver:8080
AIRFLOW_API_TOKEN=your-airflow-api-token-here
AIRFLOW_DAG_ID_FINANCE=process_finance_excel
```
### 3. Restart API Service
```bash
cd 03-apiservice
docker compose restart
```
---
## 📊 Database Schema Updates
### New Fields in `fastapi.upload_history`:
| Field | Type | Description |
|-------|------|-------------|
| `airflow_dag_run_id` | String | DAG run ID จาก Airflow |
| `airflow_state` | String | State: queued, running, success, failed |
| `processing_started_at` | DateTime | เวลาเริ่ม process |
| `processing_completed_at` | DateTime | เวลาเสร็จ |
| `error_message` | Text | Error message (ถ้ามี) |
### Migration
```bash
# Restart apiservice เพื่อสร้าง columns ใหม่
cd 03-apiservice
docker compose down
docker compose up -d
```
---
## 🚀 How It Works
### Upload Flow:
```
1. User uploads Excel file
2. API saves file to /data/uploads/
3. Create UploadHistory record (status: pending)
4. Trigger Airflow DAG (with retry logic)
├─ Attempt 1: Try trigger
├─ Wait 10 seconds
├─ Attempt 2: Retry
├─ Wait 10 seconds
└─ Attempt 3: Final retry
5. Update record:
- Success: status=processing, airflow_dag_run_id=xxx
- Failure: status=error, error_message=xxx
6. Return response to frontend
```
### Airflow DAG Flow:
```
validate_file
process_data
load_to_database
update_status
```
---
## 📝 API Endpoints
### Upload File (Triggers Airflow)
```http
POST /data-management/finance/upload
Content-Type: multipart/form-data
file: [Excel file]
description: "Optional description"
```
**Response:**
```json
{
"success": true,
"message": "File 'finance.xlsx' uploaded successfully",
"upload_id": "upload_20260313_173000",
"filename": "20260313_173000_finance.xlsx",
"airflow_triggered": true,
"dag_run_id": "manual__2026-03-13T10:30:00+00:00",
"error": null
}
```
### Get Upload History
```http
GET /data-management/finance/uploads
```
**Response:**
```json
[
{
"id": "upload_20260313_173000",
"filename": "finance.xlsx",
"filepath": "/data/uploads/20260313_173000_finance.xlsx",
"uploaded_at": "2026-03-13T17:30:00",
"description": "Q1 2026 Finance Data",
"status": "processing",
"uploaded_by": "admin",
"airflow_dag_run_id": "manual__2026-03-13T10:30:00+00:00",
"airflow_state": "running",
"processing_started_at": "2026-03-13T17:30:05",
"processing_completed_at": null,
"error_message": null
}
]
```
---
## 🔍 Monitoring
### 1. Check Upload Status
```bash
# ดู logs ของ API Service
docker logs apiservice -f | grep "Airflow"
# ตัวอย่าง logs:
# Triggering Airflow DAG (attempt 1/3)
# Airflow DAG triggered successfully: manual__2026-03-13T10:30:00+00:00
```
### 2. Check Airflow DAG
1. เข้า Airflow UI: `http://ai.sriphat.com/airflow`
2. ไปที่ **DAGs**`process_finance_excel`
3. ดู **DAG Runs** และ **Task Instances**
### 3. Check Database
```sql
-- ดู upload history พร้อม Airflow status
SELECT
upload_id,
filename,
status,
airflow_dag_run_id,
airflow_state,
processing_started_at,
processing_completed_at,
error_message
FROM fastapi.upload_history
ORDER BY uploaded_at DESC
LIMIT 10;
```
---
## 🐛 Troubleshooting
### Issue: "Failed to trigger Airflow after 3 attempts"
**สาเหตุ:**
- Airflow API Token ไม่ถูกต้อง
- Airflow webserver ไม่ทำงาน
- Network connection ล้มเหลว
**วิธีแก้:**
```bash
# 1. ตรวจสอบ Airflow webserver
docker ps | grep airflow-webserver
# 2. ตรวจสอบ network
docker network inspect shared_data_network
# 3. ทดสอบ API token
curl -X GET \
http://airflow-webserver:8080/api/v1/dags \
-H "Authorization: Bearer YOUR_TOKEN"
# 4. ดู logs
docker logs airflow-webserver -f
docker logs apiservice -f
```
### Issue: DAG ไม่ปรากฏใน Airflow UI
**สาเหตุ:**
- DAG file มี syntax error
- DAG ถูก pause
- Airflow scheduler ไม่ทำงาน
**วิธีแก้:**
```bash
# 1. ตรวจสอบ DAG file
docker exec airflow-webserver airflow dags list | grep process_finance_excel
# 2. Test DAG
docker exec airflow-webserver airflow dags test process_finance_excel
# 3. Unpause DAG
docker exec airflow-webserver airflow dags unpause process_finance_excel
# 4. ตรวจสอบ scheduler
docker logs airflow-scheduler -f
```
### Issue: DAG triggered แต่ไม่ทำงาน
**สาเหตุ:**
- Volume mount ไม่ถูกต้อง (Airflow เข้าถึง `/data/uploads` ไม่ได้)
- Dependencies ขาดหาย (pandas, openpyxl)
**วิธีแก้:**
```bash
# 1. ตรวจสอบ volume
docker exec airflow-worker ls -la /data/uploads
# 2. ติดตั้ง dependencies
docker exec airflow-worker pip install pandas openpyxl
# หรือเพิ่มใน docker-compose.yaml:
# _PIP_ADDITIONAL_REQUIREMENTS: "pandas openpyxl"
```
---
## 🔐 Security Notes
1. **API Token Security:**
- เก็บ token ใน `.env` (ไม่ commit ลง git)
- ใช้ token ที่มี scope จำกัด (ไม่ใช่ admin token)
- Rotate token เป็นระยะ
2. **File Access:**
- Airflow ต้องมี read access ไปยัง `/data/uploads`
- ตรวจสอบ file permissions
3. **Network:**
- API Service และ Airflow ต้องอยู่ใน `shared_data_network`
- ใช้ internal network (ไม่ expose Airflow API ออกภายนอก)
---
## 📚 References
- [Airflow REST API Documentation](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html)
- [Airflow Authentication](https://airflow.apache.org/docs/apache-airflow/stable/security/api.html)
- [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/)
---
## 🎯 Next Steps
1. **Implement DAG Logic:**
- แก้ไข `05-airflow/dags/process_finance_excel.py`
- เพิ่ม data transformation logic
- เพิ่ม database loading logic
2. **Add Callback:**
- สร้าง endpoint `/api/v1/airflow/callback/{upload_id}`
- Airflow จะ callback เมื่อเสร็จ/ล้มเหลว
- อัพเดท `processing_completed_at` และ `airflow_state`
3. **Frontend Updates:**
- เพิ่ม real-time status polling
- แสดง Airflow state badges
- Link ไปยัง Airflow DAG run
4. **Testing:**
- ทดสอบ upload file
- ตรวจสอบ Airflow trigger
- ตรวจสอบ retry logic
- ทดสอบ error handling