335 lines
8.3 KiB
Markdown
335 lines
8.3 KiB
Markdown
# Airflow Integration Guide
|
|
|
|
คู่มือการตั้งค่าและใช้งาน Airflow Integration สำหรับ Finance Excel Upload
|
|
|
|
## 📋 Overview
|
|
|
|
เมื่อ upload ไฟล์ Excel ผ่านหน้า `/data-management/finance` ระบบจะ:
|
|
1. บันทึกไฟล์ลง `/data/uploads/`
|
|
2. สร้าง record ใน database (`fastapi.upload_history`)
|
|
3. **Trigger Airflow DAG** `process_finance_excel` (พร้อม retry 3 ครั้ง, รอ 10 วิ)
|
|
4. Airflow จะ process ไฟล์และอัพเดท status กลับมา
|
|
|
|
---
|
|
|
|
## 🔧 Configuration
|
|
|
|
### 1. สร้าง Airflow API Token
|
|
|
|
```bash
|
|
# เข้า Airflow container
|
|
docker exec -it airflow-webserver bash
|
|
|
|
# สร้าง API token
|
|
airflow users create \
|
|
--username apiservice \
|
|
--firstname API \
|
|
--lastname Service \
|
|
--role Admin \
|
|
--email apiservice@sriphat.com \
|
|
--password your-secure-password
|
|
|
|
# หรือใช้ CLI สร้าง token
|
|
python -c "from airflow.api.auth.backend.basic_auth import BasicAuth; print(BasicAuth().get_token('apiservice', 'your-secure-password'))"
|
|
```
|
|
|
|
**หรือใช้ Airflow UI:**
|
|
1. เข้า `http://ai.sriphat.com/airflow`
|
|
2. ไปที่ **Security** → **List Users**
|
|
3. เลือก user → **Edit**
|
|
4. Copy **API Token** (หรือสร้างใหม่)
|
|
|
|
### 2. อัพเดท Environment Variables
|
|
|
|
แก้ไข `.env` ใน `03-apiservice/`:
|
|
|
|
```bash
|
|
# Airflow Integration
|
|
AIRFLOW_API_URL=http://airflow-webserver:8080
|
|
AIRFLOW_API_TOKEN=your-airflow-api-token-here
|
|
AIRFLOW_DAG_ID_FINANCE=process_finance_excel
|
|
```
|
|
|
|
### 3. Restart API Service
|
|
|
|
```bash
|
|
cd 03-apiservice
|
|
docker compose restart
|
|
```
|
|
|
|
---
|
|
|
|
## 📊 Database Schema Updates
|
|
|
|
### New Fields in `fastapi.upload_history`:
|
|
|
|
| Field | Type | Description |
|
|
|-------|------|-------------|
|
|
| `airflow_dag_run_id` | String | DAG run ID จาก Airflow |
|
|
| `airflow_state` | String | State: queued, running, success, failed |
|
|
| `processing_started_at` | DateTime | เวลาเริ่ม process |
|
|
| `processing_completed_at` | DateTime | เวลาเสร็จ |
|
|
| `error_message` | Text | Error message (ถ้ามี) |
|
|
|
|
### Migration
|
|
|
|
```bash
|
|
# Restart apiservice เพื่อสร้าง columns ใหม่
|
|
cd 03-apiservice
|
|
docker compose down
|
|
docker compose up -d
|
|
```
|
|
|
|
---
|
|
|
|
## 🚀 How It Works
|
|
|
|
### Upload Flow:
|
|
|
|
```
|
|
1. User uploads Excel file
|
|
↓
|
|
2. API saves file to /data/uploads/
|
|
↓
|
|
3. Create UploadHistory record (status: pending)
|
|
↓
|
|
4. Trigger Airflow DAG (with retry logic)
|
|
├─ Attempt 1: Try trigger
|
|
├─ Wait 10 seconds
|
|
├─ Attempt 2: Retry
|
|
├─ Wait 10 seconds
|
|
└─ Attempt 3: Final retry
|
|
↓
|
|
5. Update record:
|
|
- Success: status=processing, airflow_dag_run_id=xxx
|
|
- Failure: status=error, error_message=xxx
|
|
↓
|
|
6. Return response to frontend
|
|
```
|
|
|
|
### Airflow DAG Flow:
|
|
|
|
```
|
|
validate_file
|
|
↓
|
|
process_data
|
|
↓
|
|
load_to_database
|
|
↓
|
|
update_status
|
|
```
|
|
|
|
---
|
|
|
|
## 📝 API Endpoints
|
|
|
|
### Upload File (Triggers Airflow)
|
|
|
|
```http
|
|
POST /data-management/finance/upload
|
|
Content-Type: multipart/form-data
|
|
|
|
file: [Excel file]
|
|
description: "Optional description"
|
|
```
|
|
|
|
**Response:**
|
|
```json
|
|
{
|
|
"success": true,
|
|
"message": "File 'finance.xlsx' uploaded successfully",
|
|
"upload_id": "upload_20260313_173000",
|
|
"filename": "20260313_173000_finance.xlsx",
|
|
"airflow_triggered": true,
|
|
"dag_run_id": "manual__2026-03-13T10:30:00+00:00",
|
|
"error": null
|
|
}
|
|
```
|
|
|
|
### Get Upload History
|
|
|
|
```http
|
|
GET /data-management/finance/uploads
|
|
```
|
|
|
|
**Response:**
|
|
```json
|
|
[
|
|
{
|
|
"id": "upload_20260313_173000",
|
|
"filename": "finance.xlsx",
|
|
"filepath": "/data/uploads/20260313_173000_finance.xlsx",
|
|
"uploaded_at": "2026-03-13T17:30:00",
|
|
"description": "Q1 2026 Finance Data",
|
|
"status": "processing",
|
|
"uploaded_by": "admin",
|
|
"airflow_dag_run_id": "manual__2026-03-13T10:30:00+00:00",
|
|
"airflow_state": "running",
|
|
"processing_started_at": "2026-03-13T17:30:05",
|
|
"processing_completed_at": null,
|
|
"error_message": null
|
|
}
|
|
]
|
|
```
|
|
|
|
---
|
|
|
|
## 🔍 Monitoring
|
|
|
|
### 1. Check Upload Status
|
|
|
|
```bash
|
|
# ดู logs ของ API Service
|
|
docker logs apiservice -f | grep "Airflow"
|
|
|
|
# ตัวอย่าง logs:
|
|
# Triggering Airflow DAG (attempt 1/3)
|
|
# Airflow DAG triggered successfully: manual__2026-03-13T10:30:00+00:00
|
|
```
|
|
|
|
### 2. Check Airflow DAG
|
|
|
|
1. เข้า Airflow UI: `http://ai.sriphat.com/airflow`
|
|
2. ไปที่ **DAGs** → `process_finance_excel`
|
|
3. ดู **DAG Runs** และ **Task Instances**
|
|
|
|
### 3. Check Database
|
|
|
|
```sql
|
|
-- ดู upload history พร้อม Airflow status
|
|
SELECT
|
|
upload_id,
|
|
filename,
|
|
status,
|
|
airflow_dag_run_id,
|
|
airflow_state,
|
|
processing_started_at,
|
|
processing_completed_at,
|
|
error_message
|
|
FROM fastapi.upload_history
|
|
ORDER BY uploaded_at DESC
|
|
LIMIT 10;
|
|
```
|
|
|
|
---
|
|
|
|
## 🐛 Troubleshooting
|
|
|
|
### Issue: "Failed to trigger Airflow after 3 attempts"
|
|
|
|
**สาเหตุ:**
|
|
- Airflow API Token ไม่ถูกต้อง
|
|
- Airflow webserver ไม่ทำงาน
|
|
- Network connection ล้มเหลว
|
|
|
|
**วิธีแก้:**
|
|
|
|
```bash
|
|
# 1. ตรวจสอบ Airflow webserver
|
|
docker ps | grep airflow-webserver
|
|
|
|
# 2. ตรวจสอบ network
|
|
docker network inspect shared_data_network
|
|
|
|
# 3. ทดสอบ API token
|
|
curl -X GET \
|
|
http://airflow-webserver:8080/api/v1/dags \
|
|
-H "Authorization: Bearer YOUR_TOKEN"
|
|
|
|
# 4. ดู logs
|
|
docker logs airflow-webserver -f
|
|
docker logs apiservice -f
|
|
```
|
|
|
|
### Issue: DAG ไม่ปรากฏใน Airflow UI
|
|
|
|
**สาเหตุ:**
|
|
- DAG file มี syntax error
|
|
- DAG ถูก pause
|
|
- Airflow scheduler ไม่ทำงาน
|
|
|
|
**วิธีแก้:**
|
|
|
|
```bash
|
|
# 1. ตรวจสอบ DAG file
|
|
docker exec airflow-webserver airflow dags list | grep process_finance_excel
|
|
|
|
# 2. Test DAG
|
|
docker exec airflow-webserver airflow dags test process_finance_excel
|
|
|
|
# 3. Unpause DAG
|
|
docker exec airflow-webserver airflow dags unpause process_finance_excel
|
|
|
|
# 4. ตรวจสอบ scheduler
|
|
docker logs airflow-scheduler -f
|
|
```
|
|
|
|
### Issue: DAG triggered แต่ไม่ทำงาน
|
|
|
|
**สาเหตุ:**
|
|
- Volume mount ไม่ถูกต้อง (Airflow เข้าถึง `/data/uploads` ไม่ได้)
|
|
- Dependencies ขาดหาย (pandas, openpyxl)
|
|
|
|
**วิธีแก้:**
|
|
|
|
```bash
|
|
# 1. ตรวจสอบ volume
|
|
docker exec airflow-worker ls -la /data/uploads
|
|
|
|
# 2. ติดตั้ง dependencies
|
|
docker exec airflow-worker pip install pandas openpyxl
|
|
|
|
# หรือเพิ่มใน docker-compose.yaml:
|
|
# _PIP_ADDITIONAL_REQUIREMENTS: "pandas openpyxl"
|
|
```
|
|
|
|
---
|
|
|
|
## 🔐 Security Notes
|
|
|
|
1. **API Token Security:**
|
|
- เก็บ token ใน `.env` (ไม่ commit ลง git)
|
|
- ใช้ token ที่มี scope จำกัด (ไม่ใช่ admin token)
|
|
- Rotate token เป็นระยะ
|
|
|
|
2. **File Access:**
|
|
- Airflow ต้องมี read access ไปยัง `/data/uploads`
|
|
- ตรวจสอบ file permissions
|
|
|
|
3. **Network:**
|
|
- API Service และ Airflow ต้องอยู่ใน `shared_data_network`
|
|
- ใช้ internal network (ไม่ expose Airflow API ออกภายนอก)
|
|
|
|
---
|
|
|
|
## 📚 References
|
|
|
|
- [Airflow REST API Documentation](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html)
|
|
- [Airflow Authentication](https://airflow.apache.org/docs/apache-airflow/stable/security/api.html)
|
|
- [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/)
|
|
|
|
---
|
|
|
|
## 🎯 Next Steps
|
|
|
|
1. **Implement DAG Logic:**
|
|
- แก้ไข `05-airflow/dags/process_finance_excel.py`
|
|
- เพิ่ม data transformation logic
|
|
- เพิ่ม database loading logic
|
|
|
|
2. **Add Callback:**
|
|
- สร้าง endpoint `/api/v1/airflow/callback/{upload_id}`
|
|
- Airflow จะ callback เมื่อเสร็จ/ล้มเหลว
|
|
- อัพเดท `processing_completed_at` และ `airflow_state`
|
|
|
|
3. **Frontend Updates:**
|
|
- เพิ่ม real-time status polling
|
|
- แสดง Airflow state badges
|
|
- Link ไปยัง Airflow DAG run
|
|
|
|
4. **Testing:**
|
|
- ทดสอบ upload file
|
|
- ตรวจสอบ Airflow trigger
|
|
- ตรวจสอบ retry logic
|
|
- ทดสอบ error handling
|