GER ใช้ฟังก์ชันบูตสแตรปที่ใช้ csv_stream แทรกข้อมูลลงใน PostGres และส่งกลับสัญญา Q เมื่อดำเนินการเสร็จสิ้น (หรือมีข้อผิดพลาด) bootstrap สามารถแบ่งออกเป็นสองส่วน คือ การเชื่อมต่อและการแทรกข้อมูล
รับการเชื่อมต่อจาก Knex
GER ใช้เครื่องมือสร้างแบบสอบถาม Knex เพื่อสื่อสารและจัดการการเชื่อมต่อกับฐานข้อมูล การดำเนินการแรกสำหรับฟังก์ชัน bootstrap คือการเชื่อมต่อกับฐานข้อมูลจาก Knex:
runner = new knex.client.Runner(knex.client) runner.ensureConnection() .then( (connection) => runner.connection = connection #Use connection ) .finally( -> runner.cleanupConnection())
เช่นเดียวกับ "Upsert" นี่เป็นวิธีจัดการกับการเชื่อมต่อแบบวงกลมเล็กน้อย Knex นั้นยอดเยี่ยม แต่การทำงานรอบขอบฟังก์ชั่นอาจเป็นเรื่องยาก
การแทรกข้อมูลด้วย pg-copy-streams
แบบสอบถามที่ใช้ในการแทรกข้อมูลคือ:
query = "COPY events (person, action, thing) FROM STDIN CSV"
ค้นหาข้อมูลของ COPY ไปยังเหตุการณ์ตาราง โดยแทรกแถวบุคคล การกระทำ และสิ่งของจากสตรีมมาตรฐานใน (STDIN) เป็นรูปแบบค่าที่คั่นด้วยเครื่องหมายจุลภาค (CSV) ตัวอย่างเช่น หากข้อมูลสตรีมคือ:
bob, views, product1 alice, buys, product2
แบบสอบถามจะแทรกสองเหตุการณ์หนึ่งเหตุการณ์สำหรับแต่ละแถว
การใช้ฟังก์ชัน from ใน pg-copy-streams (copyFrom = need('pg-copy-streams').from) แบบสอบถามจะถูกห่อและส่งไปยังการเชื่อมต่อ PostGres:
copy = connection.query(copyFrom(query))
สตรีมสำเนาที่ส่งคืนคือสตรีมแบบเขียนได้ซึ่งมีการไพพ์อินพุต csv_stream:
csv_stream.pipe(copy)
หากต้องการแจ้งผู้โทรว่าบูตสแตรปเสร็จสิ้นหรือเกิดข้อผิดพลาด ระบบจะสร้าง Q defer ซึ่งจะคอยฟังการสิ้นสุดสตรีมและเหตุการณ์ข้อผิดพลาด:
deferred = q.defer() csv_stream.pipe(copy) .on('end', -> deferred.resolve()) .on('error', (err) -> deferred.reject(err))
เมื่อรวมการแทรกข้อมูลนี้แล้วดูเหมือนว่า:
query = "COPY events (person, action, thing) FROM STDIN CSV" copy = connection.query(copyFrom(query)); deferred = q.defer() csv_stream.pipe(copy) .on('end', -> deferred.resolve()) .on('error', (err) -> deferred.reject(err)) return deferred.promise
Hapi.js และสตรีม
ด้วยการใช้เซิร์ฟเวอร์ Hapi.js ที่เชื่อมต่อกับฟังก์ชันบูตสแตรปของ GER คุณสามารถอัปโหลดและสตรีมไฟล์ไปยัง PostGres ได้โดยตรง ตามที่อธิบายไว้ใน เอกสาร Hapi.js สามารถตั้งค่าเส้นทางเพื่อส่งออกสตรีม Node.js
หากต้องการใช้งานนี้ จะต้องสร้างเซิร์ฟเวอร์ Hapi.js ด้วย:
Hapi = require('hapi') server = new Hapi.Server('localhost', 8000)
เส้นทางที่อัปโหลดไฟล์และแปลงเป็นสตรีม Node.js จะถูกเพิ่มไปยังเซิร์ฟเวอร์ Hapi.js ด้วย:
server.route method: 'POST' path: 'event/bootstrap' config: payload: maxBytes: 209715200 output:'stream' parse: true handler: (request, reply) -> #do things...
ตัวเลือกตัวจัดการคือฟังก์ชันที่จัดการคำขอ สามารถเข้าถึงสตรีมไฟล์ที่อัปโหลดในเพย์โหลดคำขอ เช่น request.payload[“เหตุการณ์”] สตรีมนี้ถูกส่งไปยังฟังก์ชันบูตสแตรปของ GER เช่น
handler: (request, reply) => ger.bootstrap(request.payload["events"]) .then( -> reply({finished: true})) .fail((err) -> reply({error: err}).code(500))
ส่วนสุดท้ายคือการเริ่มเซิร์ฟเวอร์ Hapi.js ด้วย server.start()
การทดสอบเซิร์ฟเวอร์
เพื่อทดสอบเซิร์ฟเวอร์และเส้นทาง สามารถใช้ curl เพื่ออัพโหลดไฟล์ได้ เช่น
curl -i -F [email protected] http://localhost:8000/event/bootstrap
curl ยังสามารถใช้สตรีมมาตรฐานและอัปโหลดได้ เช่น
head data.csv | curl -i -F events=@- http://localhost:8000/event/bootstrap
ฉันแค่อยากจะใช้เวลาและตรวจสอบว่าสิ่งนี้ยอดเยี่ยมแค่ไหน head สร้างสตรีมมาตรฐาน ไพพ์ให้โค้งงอซึ่งเปลี่ยนเป็น "คำขอหลายส่วน HTTP" Hapi.js เปลี่ยนคำขอนั้นเป็นสตรีม Node.js ซึ่งจากนั้นส่งไปป์ไปยัง PostGres เป็นสตรีมมาตรฐานจากฟังก์ชันบูตสแตรปของ GER . มันเจ๋งมาก!
การวัดประสิทธิภาพ
ฉันเขียนการทดสอบ "มอคค่า" เล็กๆ ที่เปรียบเทียบการแทรก 10,000 เหตุการณ์ลงใน GER ทีละเหตุการณ์ และเปรียบเทียบกับการแทรก 10,000 เหตุการณ์โดยใช้ฟังก์ชันบูตสแตรป
ผลลัพธ์คือ:
- 0.7297ms ต่อเหตุการณ์ เมื่อแต่ละเหตุการณ์ถูกแทรกทีละรายการ
- 0.0696ms ต่อเหตุการณ์ สำหรับกิจกรรมที่ใช้ bootstrap
นั่นคือ การปรับปรุงประสิทธิภาพ 10 เท่า เมื่อแทรกเหตุการณ์
ผลลัพธ์เหล่านี้เกินจริงไปมากกว่านี้อีกเมื่อเพิ่มโอเวอร์เฮดของ HTTP เนื่องจากการแทรกแต่ละรายการยังต้องการโอเวอร์เฮดของคำขอ HTTP ของตัวเองด้วย โดยที่ไฟล์ที่อัปโหลดหนึ่งไฟล์เป็นเพียงคำขอเดียวเท่านั้น
อ่านเพิ่มเติม
ภาพจาก RLA-Inque