Files
sriphat-dataplatform/03-apiservice/AIRFLOW_INTEGRATION.md

8.3 KiB

Airflow Integration Guide

คู่มือการตั้งค่าและใช้งาน Airflow Integration สำหรับ Finance Excel Upload

📋 Overview

เมื่อ upload ไฟล์ Excel ผ่านหน้า /data-management/finance ระบบจะ:

  1. บันทึกไฟล์ลง /data/uploads/
  2. สร้าง record ใน database (fastapi.upload_history)
  3. Trigger Airflow DAG process_finance_excel (พร้อม retry 3 ครั้ง, รอ 10 วิ)
  4. Airflow จะ process ไฟล์และอัพเดท status กลับมา

🔧 Configuration

1. สร้าง Airflow API Token

# เข้า Airflow container
docker exec -it airflow-webserver bash

# สร้าง API token
airflow users create \
  --username apiservice \
  --firstname API \
  --lastname Service \
  --role Admin \
  --email apiservice@sriphat.com \
  --password your-secure-password

# หรือใช้ CLI สร้าง token
python -c "from airflow.api.auth.backend.basic_auth import BasicAuth; print(BasicAuth().get_token('apiservice', 'your-secure-password'))"

หรือใช้ Airflow UI:

  1. เข้า http://ai.sriphat.com/airflow
  2. ไปที่ SecurityList Users
  3. เลือก user → Edit
  4. Copy API Token (หรือสร้างใหม่)

2. อัพเดท Environment Variables

แก้ไข .env ใน 03-apiservice/:

# Airflow Integration
AIRFLOW_API_URL=http://airflow-webserver:8080
AIRFLOW_API_TOKEN=your-airflow-api-token-here
AIRFLOW_DAG_ID_FINANCE=process_finance_excel

3. Restart API Service

cd 03-apiservice
docker compose restart

📊 Database Schema Updates

New Fields in fastapi.upload_history:

Field Type Description
airflow_dag_run_id String DAG run ID จาก Airflow
airflow_state String State: queued, running, success, failed
processing_started_at DateTime เวลาเริ่ม process
processing_completed_at DateTime เวลาเสร็จ
error_message Text Error message (ถ้ามี)

Migration

# Restart apiservice เพื่อสร้าง columns ใหม่
cd 03-apiservice
docker compose down
docker compose up -d

🚀 How It Works

Upload Flow:

1. User uploads Excel file
   ↓
2. API saves file to /data/uploads/
   ↓
3. Create UploadHistory record (status: pending)
   ↓
4. Trigger Airflow DAG (with retry logic)
   ├─ Attempt 1: Try trigger
   ├─ Wait 10 seconds
   ├─ Attempt 2: Retry
   ├─ Wait 10 seconds
   └─ Attempt 3: Final retry
   ↓
5. Update record:
   - Success: status=processing, airflow_dag_run_id=xxx
   - Failure: status=error, error_message=xxx
   ↓
6. Return response to frontend

Airflow DAG Flow:

validate_file
    ↓
process_data
    ↓
load_to_database
    ↓
update_status

📝 API Endpoints

Upload File (Triggers Airflow)

POST /data-management/finance/upload
Content-Type: multipart/form-data

file: [Excel file]
description: "Optional description"

Response:

{
  "success": true,
  "message": "File 'finance.xlsx' uploaded successfully",
  "upload_id": "upload_20260313_173000",
  "filename": "20260313_173000_finance.xlsx",
  "airflow_triggered": true,
  "dag_run_id": "manual__2026-03-13T10:30:00+00:00",
  "error": null
}

Get Upload History

GET /data-management/finance/uploads

Response:

[
  {
    "id": "upload_20260313_173000",
    "filename": "finance.xlsx",
    "filepath": "/data/uploads/20260313_173000_finance.xlsx",
    "uploaded_at": "2026-03-13T17:30:00",
    "description": "Q1 2026 Finance Data",
    "status": "processing",
    "uploaded_by": "admin",
    "airflow_dag_run_id": "manual__2026-03-13T10:30:00+00:00",
    "airflow_state": "running",
    "processing_started_at": "2026-03-13T17:30:05",
    "processing_completed_at": null,
    "error_message": null
  }
]

🔍 Monitoring

1. Check Upload Status

# ดู logs ของ API Service
docker logs apiservice -f | grep "Airflow"

# ตัวอย่าง logs:
# Triggering Airflow DAG (attempt 1/3)
# Airflow DAG triggered successfully: manual__2026-03-13T10:30:00+00:00

2. Check Airflow DAG

  1. เข้า Airflow UI: http://ai.sriphat.com/airflow
  2. ไปที่ DAGsprocess_finance_excel
  3. ดู DAG Runs และ Task Instances

3. Check Database

-- ดู upload history พร้อม Airflow status
SELECT 
    upload_id,
    filename,
    status,
    airflow_dag_run_id,
    airflow_state,
    processing_started_at,
    processing_completed_at,
    error_message
FROM fastapi.upload_history
ORDER BY uploaded_at DESC
LIMIT 10;

🐛 Troubleshooting

Issue: "Failed to trigger Airflow after 3 attempts"

สาเหตุ:

  • Airflow API Token ไม่ถูกต้อง
  • Airflow webserver ไม่ทำงาน
  • Network connection ล้มเหลว

วิธีแก้:

# 1. ตรวจสอบ Airflow webserver
docker ps | grep airflow-webserver

# 2. ตรวจสอบ network
docker network inspect shared_data_network

# 3. ทดสอบ API token
curl -X GET \
  http://airflow-webserver:8080/api/v1/dags \
  -H "Authorization: Bearer YOUR_TOKEN"

# 4. ดู logs
docker logs airflow-webserver -f
docker logs apiservice -f

Issue: DAG ไม่ปรากฏใน Airflow UI

สาเหตุ:

  • DAG file มี syntax error
  • DAG ถูก pause
  • Airflow scheduler ไม่ทำงาน

วิธีแก้:

# 1. ตรวจสอบ DAG file
docker exec airflow-webserver airflow dags list | grep process_finance_excel

# 2. Test DAG
docker exec airflow-webserver airflow dags test process_finance_excel

# 3. Unpause DAG
docker exec airflow-webserver airflow dags unpause process_finance_excel

# 4. ตรวจสอบ scheduler
docker logs airflow-scheduler -f

Issue: DAG triggered แต่ไม่ทำงาน

สาเหตุ:

  • Volume mount ไม่ถูกต้อง (Airflow เข้าถึง /data/uploads ไม่ได้)
  • Dependencies ขาดหาย (pandas, openpyxl)

วิธีแก้:

# 1. ตรวจสอบ volume
docker exec airflow-worker ls -la /data/uploads

# 2. ติดตั้ง dependencies
docker exec airflow-worker pip install pandas openpyxl

# หรือเพิ่มใน docker-compose.yaml:
# _PIP_ADDITIONAL_REQUIREMENTS: "pandas openpyxl"

🔐 Security Notes

  1. API Token Security:

    • เก็บ token ใน .env (ไม่ commit ลง git)
    • ใช้ token ที่มี scope จำกัด (ไม่ใช่ admin token)
    • Rotate token เป็นระยะ
  2. File Access:

    • Airflow ต้องมี read access ไปยัง /data/uploads
    • ตรวจสอบ file permissions
  3. Network:

    • API Service และ Airflow ต้องอยู่ใน shared_data_network
    • ใช้ internal network (ไม่ expose Airflow API ออกภายนอก)

📚 References


🎯 Next Steps

  1. Implement DAG Logic:

    • แก้ไข 05-airflow/dags/process_finance_excel.py
    • เพิ่ม data transformation logic
    • เพิ่ม database loading logic
  2. Add Callback:

    • สร้าง endpoint /api/v1/airflow/callback/{upload_id}
    • Airflow จะ callback เมื่อเสร็จ/ล้มเหลว
    • อัพเดท processing_completed_at และ airflow_state
  3. Frontend Updates:

    • เพิ่ม real-time status polling
    • แสดง Airflow state badges
    • Link ไปยัง Airflow DAG run
  4. Testing:

    • ทดสอบ upload file
    • ตรวจสอบ Airflow trigger
    • ตรวจสอบ retry logic
    • ทดสอบ error handling