8.3 KiB
8.3 KiB
Airflow Integration Guide
คู่มือการตั้งค่าและใช้งาน Airflow Integration สำหรับ Finance Excel Upload
📋 Overview
เมื่อ upload ไฟล์ Excel ผ่านหน้า /data-management/finance ระบบจะ:
- บันทึกไฟล์ลง
/data/uploads/ - สร้าง record ใน database (
fastapi.upload_history) - Trigger Airflow DAG
process_finance_excel(พร้อม retry 3 ครั้ง, รอ 10 วิ) - Airflow จะ process ไฟล์และอัพเดท status กลับมา
🔧 Configuration
1. สร้าง Airflow API Token
# เข้า Airflow container
docker exec -it airflow-webserver bash
# สร้าง API token
airflow users create \
--username apiservice \
--firstname API \
--lastname Service \
--role Admin \
--email apiservice@sriphat.com \
--password your-secure-password
# หรือใช้ CLI สร้าง token
python -c "from airflow.api.auth.backend.basic_auth import BasicAuth; print(BasicAuth().get_token('apiservice', 'your-secure-password'))"
หรือใช้ Airflow UI:
- เข้า
http://ai.sriphat.com/airflow - ไปที่ Security → List Users
- เลือก user → Edit
- Copy API Token (หรือสร้างใหม่)
2. อัพเดท Environment Variables
แก้ไข .env ใน 03-apiservice/:
# Airflow Integration
AIRFLOW_API_URL=http://airflow-webserver:8080
AIRFLOW_API_TOKEN=your-airflow-api-token-here
AIRFLOW_DAG_ID_FINANCE=process_finance_excel
3. Restart API Service
cd 03-apiservice
docker compose restart
📊 Database Schema Updates
New Fields in fastapi.upload_history:
| Field | Type | Description |
|---|---|---|
airflow_dag_run_id |
String | DAG run ID จาก Airflow |
airflow_state |
String | State: queued, running, success, failed |
processing_started_at |
DateTime | เวลาเริ่ม process |
processing_completed_at |
DateTime | เวลาเสร็จ |
error_message |
Text | Error message (ถ้ามี) |
Migration
# Restart apiservice เพื่อสร้าง columns ใหม่
cd 03-apiservice
docker compose down
docker compose up -d
🚀 How It Works
Upload Flow:
1. User uploads Excel file
↓
2. API saves file to /data/uploads/
↓
3. Create UploadHistory record (status: pending)
↓
4. Trigger Airflow DAG (with retry logic)
├─ Attempt 1: Try trigger
├─ Wait 10 seconds
├─ Attempt 2: Retry
├─ Wait 10 seconds
└─ Attempt 3: Final retry
↓
5. Update record:
- Success: status=processing, airflow_dag_run_id=xxx
- Failure: status=error, error_message=xxx
↓
6. Return response to frontend
Airflow DAG Flow:
validate_file
↓
process_data
↓
load_to_database
↓
update_status
📝 API Endpoints
Upload File (Triggers Airflow)
POST /data-management/finance/upload
Content-Type: multipart/form-data
file: [Excel file]
description: "Optional description"
Response:
{
"success": true,
"message": "File 'finance.xlsx' uploaded successfully",
"upload_id": "upload_20260313_173000",
"filename": "20260313_173000_finance.xlsx",
"airflow_triggered": true,
"dag_run_id": "manual__2026-03-13T10:30:00+00:00",
"error": null
}
Get Upload History
GET /data-management/finance/uploads
Response:
[
{
"id": "upload_20260313_173000",
"filename": "finance.xlsx",
"filepath": "/data/uploads/20260313_173000_finance.xlsx",
"uploaded_at": "2026-03-13T17:30:00",
"description": "Q1 2026 Finance Data",
"status": "processing",
"uploaded_by": "admin",
"airflow_dag_run_id": "manual__2026-03-13T10:30:00+00:00",
"airflow_state": "running",
"processing_started_at": "2026-03-13T17:30:05",
"processing_completed_at": null,
"error_message": null
}
]
🔍 Monitoring
1. Check Upload Status
# ดู logs ของ API Service
docker logs apiservice -f | grep "Airflow"
# ตัวอย่าง logs:
# Triggering Airflow DAG (attempt 1/3)
# Airflow DAG triggered successfully: manual__2026-03-13T10:30:00+00:00
2. Check Airflow DAG
- เข้า Airflow UI:
http://ai.sriphat.com/airflow - ไปที่ DAGs →
process_finance_excel - ดู DAG Runs และ Task Instances
3. Check Database
-- ดู upload history พร้อม Airflow status
SELECT
upload_id,
filename,
status,
airflow_dag_run_id,
airflow_state,
processing_started_at,
processing_completed_at,
error_message
FROM fastapi.upload_history
ORDER BY uploaded_at DESC
LIMIT 10;
🐛 Troubleshooting
Issue: "Failed to trigger Airflow after 3 attempts"
สาเหตุ:
- Airflow API Token ไม่ถูกต้อง
- Airflow webserver ไม่ทำงาน
- Network connection ล้มเหลว
วิธีแก้:
# 1. ตรวจสอบ Airflow webserver
docker ps | grep airflow-webserver
# 2. ตรวจสอบ network
docker network inspect shared_data_network
# 3. ทดสอบ API token
curl -X GET \
http://airflow-webserver:8080/api/v1/dags \
-H "Authorization: Bearer YOUR_TOKEN"
# 4. ดู logs
docker logs airflow-webserver -f
docker logs apiservice -f
Issue: DAG ไม่ปรากฏใน Airflow UI
สาเหตุ:
- DAG file มี syntax error
- DAG ถูก pause
- Airflow scheduler ไม่ทำงาน
วิธีแก้:
# 1. ตรวจสอบ DAG file
docker exec airflow-webserver airflow dags list | grep process_finance_excel
# 2. Test DAG
docker exec airflow-webserver airflow dags test process_finance_excel
# 3. Unpause DAG
docker exec airflow-webserver airflow dags unpause process_finance_excel
# 4. ตรวจสอบ scheduler
docker logs airflow-scheduler -f
Issue: DAG triggered แต่ไม่ทำงาน
สาเหตุ:
- Volume mount ไม่ถูกต้อง (Airflow เข้าถึง
/data/uploadsไม่ได้) - Dependencies ขาดหาย (pandas, openpyxl)
วิธีแก้:
# 1. ตรวจสอบ volume
docker exec airflow-worker ls -la /data/uploads
# 2. ติดตั้ง dependencies
docker exec airflow-worker pip install pandas openpyxl
# หรือเพิ่มใน docker-compose.yaml:
# _PIP_ADDITIONAL_REQUIREMENTS: "pandas openpyxl"
🔐 Security Notes
-
API Token Security:
- เก็บ token ใน
.env(ไม่ commit ลง git) - ใช้ token ที่มี scope จำกัด (ไม่ใช่ admin token)
- Rotate token เป็นระยะ
- เก็บ token ใน
-
File Access:
- Airflow ต้องมี read access ไปยัง
/data/uploads - ตรวจสอบ file permissions
- Airflow ต้องมี read access ไปยัง
-
Network:
- API Service และ Airflow ต้องอยู่ใน
shared_data_network - ใช้ internal network (ไม่ expose Airflow API ออกภายนอก)
- API Service และ Airflow ต้องอยู่ใน
📚 References
🎯 Next Steps
-
Implement DAG Logic:
- แก้ไข
05-airflow/dags/process_finance_excel.py - เพิ่ม data transformation logic
- เพิ่ม database loading logic
- แก้ไข
-
Add Callback:
- สร้าง endpoint
/api/v1/airflow/callback/{upload_id} - Airflow จะ callback เมื่อเสร็จ/ล้มเหลว
- อัพเดท
processing_completed_atและairflow_state
- สร้าง endpoint
-
Frontend Updates:
- เพิ่ม real-time status polling
- แสดง Airflow state badges
- Link ไปยัง Airflow DAG run
-
Testing:
- ทดสอบ upload file
- ตรวจสอบ Airflow trigger
- ตรวจสอบ retry logic
- ทดสอบ error handling