ในบทความนี้ เราจะแนะนำทางเลือกอื่นแทนแนวทางที่แนะนำของ Google ในการกำหนดเวลาการทำงานของไปป์ไลน์ Vertex AI
คำแนะนำของ Google
เอกสารประกอบปัจจุบันจะให้เรา:
1. สร้างไปป์ไลน์และอัปโหลดข้อมูลจำเพาะของไปป์ไลน์ไปยัง GCS
2. สร้างฟังก์ชันคลาวด์ด้วยทริกเกอร์ HTTP
3. สร้างงาน Job Scheduler
วิธีนี้ใช้งานได้ดีและราคาถูกกว่าการใช้บริการ Composer อยู่
ยกเว้นว่าฉันใช้ Cloud Composer (Airflow) เพื่อจัดการไปป์ไลน์อื่นๆ ของฉัน และการมีระบบกำหนดเวลาหลายระบบก็น่ารำคาญ (มีการตรวจสอบมากขึ้น เตรียมความพร้อมมากขึ้นสำหรับระบบใหม่ เพื่อนร่วมทีม ปัญหาที่อาจเกิดขึ้นมากขึ้น เป็นต้น)
ฉันจะอธิบายวิธีแทนที่งาน Job Scheduler ด้วย Airflow DAG จากนั้นจะอธิบายวิธีกำจัด Cloud Function ด้วยเช่นกัน
การแทนที่งาน Job Scheduler ด้วย DAG
สิ่งนี้ทำให้ DAG มีงานเดียว: การเรียกจุดสิ้นสุดของฟังก์ชันคลาวด์ด้วยพารามิเตอร์ที่เหมาะสม ซึ่งสามารถทำได้โดยใช้ PythonOperator
จาก airflow.operators.python
หรือด้วยมัณฑนากรบนฟังก์ชันหลาม @airflow.decorators.task
นี่คือเวอร์ชัน PythonOperator
:
ตอนนี้พารามิเตอร์ URL
คือจุดสิ้นสุดของฟังก์ชันคลาวด์ ฉันเปลี่ยนรหัสฟังก์ชั่นคลาวด์เพื่อแยกวิเคราะห์ request
(ตามเอกสาร):
# Example code, this is somehow generic but not ideal defprocess_request
(request): ... pipeline_spec_uri = request.args.get('pipeline_spec_uri')parameter_values
= {} for key, value in request.args.items(): if key == 'pipeline_spec_uri': continueparameter_values
[key] = value [rest of code: aiplatform init, job, submit]
การตั้งเวลาด้วย Composer/Airflow เท่านั้น
ตอนนี้เพื่อกำจัดบิตฟังก์ชันคลาวด์:
1. เพิ่มการอ้างอิงไปยัง Airflow ในกรณีของ Composer ภายใต้ PYPI PACKAGES
tab (ไม่ต้องสนใจส่วน slack
part มีไว้สำหรับการแจ้งเตือนเกี่ยวกับความล้มเหลวจาก โพสต์นี้) เวอร์ชัน (หรือขาด) มาจาก "เอกสารของ GCP":
2. อัปเดต DAG เพื่อใช้ aiplatform
ลองใช้มัณฑนากรเพื่อแสดงให้เห็นว่า:
ตอนนี้เราใช้เฉพาะ: Vertex AI สำหรับไปป์ไลน์และ Composer เพื่อทริกเกอร์ตามกำหนดเวลา