ในบทความนี้ เราจะแนะนำทางเลือกอื่นแทนแนวทางที่แนะนำของ Google ในการกำหนดเวลาการทำงานของไปป์ไลน์ Vertex AI

คำแนะนำของ Google

เอกสารประกอบปัจจุบันจะให้เรา:
1. สร้างไปป์ไลน์และอัปโหลดข้อมูลจำเพาะของไปป์ไลน์ไปยัง GCS
2. สร้างฟังก์ชันคลาวด์ด้วยทริกเกอร์ HTTP
3. สร้างงาน Job Scheduler

วิธีนี้ใช้งานได้ดีและราคาถูกกว่าการใช้บริการ Composer อยู่
ยกเว้นว่าฉันใช้ Cloud Composer (Airflow) เพื่อจัดการไปป์ไลน์อื่นๆ ของฉัน และการมีระบบกำหนดเวลาหลายระบบก็น่ารำคาญ (มีการตรวจสอบมากขึ้น เตรียมความพร้อมมากขึ้นสำหรับระบบใหม่ เพื่อนร่วมทีม ปัญหาที่อาจเกิดขึ้นมากขึ้น เป็นต้น)

ฉันจะอธิบายวิธีแทนที่งาน Job Scheduler ด้วย Airflow DAG จากนั้นจะอธิบายวิธีกำจัด Cloud Function ด้วยเช่นกัน

การแทนที่งาน Job Scheduler ด้วย DAG

สิ่งนี้ทำให้ DAG มีงานเดียว: การเรียกจุดสิ้นสุดของฟังก์ชันคลาวด์ด้วยพารามิเตอร์ที่เหมาะสม ซึ่งสามารถทำได้โดยใช้ PythonOperator จาก airflow.operators.python หรือด้วยมัณฑนากรบนฟังก์ชันหลาม @airflow.decorators.task

นี่คือเวอร์ชัน PythonOperator:

ตอนนี้พารามิเตอร์ URL คือจุดสิ้นสุดของฟังก์ชันคลาวด์ ฉันเปลี่ยนรหัสฟังก์ชั่นคลาวด์เพื่อแยกวิเคราะห์ request (ตามเอกสาร):

# Example code, this is somehow generic but not ideal
def process_request(request):
    
    ...
    pipeline_spec_uri = request.args.get('pipeline_spec_uri')
    parameter_values = {}
    for key, value in request.args.items():
         if key == 'pipeline_spec_uri':
            continue
         parameter_values[key] = value
    [rest of code: aiplatform init, job, submit]

การตั้งเวลาด้วย Composer/Airflow เท่านั้น

ตอนนี้เพื่อกำจัดบิตฟังก์ชันคลาวด์:
1. เพิ่มการอ้างอิงไปยัง Airflow ในกรณีของ Composer ภายใต้ PYPI PACKAGEStab (ไม่ต้องสนใจส่วน slackpart มีไว้สำหรับการแจ้งเตือนเกี่ยวกับความล้มเหลวจาก โพสต์นี้) เวอร์ชัน (หรือขาด) มาจาก "เอกสารของ GCP":

2. อัปเดต DAG เพื่อใช้ aiplatform ลองใช้มัณฑนากรเพื่อแสดงให้เห็นว่า:

ตอนนี้เราใช้เฉพาะ: Vertex AI สำหรับไปป์ไลน์และ Composer เพื่อทริกเกอร์ตามกำหนดเวลา