GER ใช้ฟังก์ชันบูตสแตรปที่ใช้ csv_stream แทรกข้อมูลลงใน PostGres และส่งกลับสัญญา Q เมื่อดำเนินการเสร็จสิ้น (หรือมีข้อผิดพลาด) bootstrap สามารถแบ่งออกเป็นสองส่วน คือ การเชื่อมต่อและการแทรกข้อมูล

รับการเชื่อมต่อจาก Knex

GER ใช้เครื่องมือสร้างแบบสอบถาม Knex เพื่อสื่อสารและจัดการการเชื่อมต่อกับฐานข้อมูล การดำเนินการแรกสำหรับฟังก์ชัน bootstrap คือการเชื่อมต่อกับฐานข้อมูลจาก Knex:

runner = new knex.client.Runner(knex.client)
runner.ensureConnection()
.then( (connection) =>
 runner.connection = connection
 #Use connection
)
.finally( -> runner.cleanupConnection())

เช่นเดียวกับ "Upsert" นี่เป็นวิธีจัดการกับการเชื่อมต่อแบบวงกลมเล็กน้อย Knex นั้นยอดเยี่ยม แต่การทำงานรอบขอบฟังก์ชั่นอาจเป็นเรื่องยาก

การแทรกข้อมูลด้วย pg-copy-streams

แบบสอบถามที่ใช้ในการแทรกข้อมูลคือ:

query = "COPY events (person, action, thing) FROM STDIN CSV"

ค้นหาข้อมูลของ COPY ไปยังเหตุการณ์ตาราง โดยแทรกแถวบุคคล การกระทำ และสิ่งของจากสตรีมมาตรฐานใน (STDIN) เป็นรูปแบบค่าที่คั่นด้วยเครื่องหมายจุลภาค (CSV) ตัวอย่างเช่น หากข้อมูลสตรีมคือ:

bob, views, product1
alice, buys, product2

แบบสอบถามจะแทรกสองเหตุการณ์หนึ่งเหตุการณ์สำหรับแต่ละแถว

การใช้ฟังก์ชัน from ใน pg-copy-streams (copyFrom = need('pg-copy-streams').from) แบบสอบถามจะถูกห่อและส่งไปยังการเชื่อมต่อ PostGres:

copy = connection.query(copyFrom(query))

สตรีมสำเนาที่ส่งคืนคือสตรีมแบบเขียนได้ซึ่งมีการไพพ์อินพุต csv_stream:

csv_stream.pipe(copy)

หากต้องการแจ้งผู้โทรว่าบูตสแตรปเสร็จสิ้นหรือเกิดข้อผิดพลาด ระบบจะสร้าง Q defer ซึ่งจะคอยฟังการสิ้นสุดสตรีมและเหตุการณ์ข้อผิดพลาด:

deferred = q.defer()
csv_stream.pipe(copy)
.on('end', -> deferred.resolve())
.on('error', (err) -> deferred.reject(err))

เมื่อรวมการแทรกข้อมูลนี้แล้วดูเหมือนว่า:

query = "COPY events (person, action, thing) FROM STDIN CSV"
copy = connection.query(copyFrom(query));
deferred = q.defer()
csv_stream.pipe(copy)
.on('end', -> deferred.resolve())
.on('error', (err) -> deferred.reject(err))
return deferred.promise

Hapi.js และสตรีม

ด้วยการใช้เซิร์ฟเวอร์ Hapi.js ที่เชื่อมต่อกับฟังก์ชันบูตสแตรปของ GER คุณสามารถอัปโหลดและสตรีมไฟล์ไปยัง PostGres ได้โดยตรง ตามที่อธิบายไว้ใน เอกสาร Hapi.js สามารถตั้งค่าเส้นทางเพื่อส่งออกสตรีม Node.js

หากต้องการใช้งานนี้ จะต้องสร้างเซิร์ฟเวอร์ Hapi.js ด้วย:

Hapi = require('hapi')
server = new Hapi.Server('localhost', 8000)

เส้นทางที่อัปโหลดไฟล์และแปลงเป็นสตรีม Node.js จะถูกเพิ่มไปยังเซิร์ฟเวอร์ Hapi.js ด้วย:

server.route
 method: 'POST'
 path: 'event/bootstrap'
 config:
  payload:
   maxBytes: 209715200
   output:'stream'
   parse: true
 handler: (request, reply) ->
   #do things...

ตัวเลือกตัวจัดการคือฟังก์ชันที่จัดการคำขอ สามารถเข้าถึงสตรีมไฟล์ที่อัปโหลดในเพย์โหลดคำขอ เช่น request.payload[“เหตุการณ์”] สตรีมนี้ถูกส่งไปยังฟังก์ชันบูตสแตรปของ GER เช่น

handler: (request, reply) =>
  ger.bootstrap(request.payload["events"])
  .then( -> reply({finished: true}))
  .fail((err) -> reply({error: err}).code(500))

ส่วนสุดท้ายคือการเริ่มเซิร์ฟเวอร์ Hapi.js ด้วย server.start()

การทดสอบเซิร์ฟเวอร์

เพื่อทดสอบเซิร์ฟเวอร์และเส้นทาง สามารถใช้ curl เพื่ออัพโหลดไฟล์ได้ เช่น

curl -i -F [email protected] http://localhost:8000/event/bootstrap

curl ยังสามารถใช้สตรีมมาตรฐานและอัปโหลดได้ เช่น

head data.csv | curl -i -F events=@- http://localhost:8000/event/bootstrap

ฉันแค่อยากจะใช้เวลาและตรวจสอบว่าสิ่งนี้ยอดเยี่ยมแค่ไหน head สร้างสตรีมมาตรฐาน ไพพ์ให้โค้งงอซึ่งเปลี่ยนเป็น "คำขอหลายส่วน HTTP" Hapi.js เปลี่ยนคำขอนั้นเป็นสตรีม Node.js ซึ่งจากนั้นส่งไปป์ไปยัง PostGres เป็นสตรีมมาตรฐานจากฟังก์ชันบูตสแตรปของ GER . มันเจ๋งมาก!

การวัดประสิทธิภาพ

ฉันเขียนการทดสอบ "มอคค่า" เล็กๆ ที่เปรียบเทียบการแทรก 10,000 เหตุการณ์ลงใน GER ทีละเหตุการณ์ และเปรียบเทียบกับการแทรก 10,000 เหตุการณ์โดยใช้ฟังก์ชันบูตสแตรป

ผลลัพธ์คือ:

  1. 0.7297ms ต่อเหตุการณ์ เมื่อแต่ละเหตุการณ์ถูกแทรกทีละรายการ
  2. 0.0696ms ต่อเหตุการณ์ สำหรับกิจกรรมที่ใช้ bootstrap

นั่นคือ การปรับปรุงประสิทธิภาพ 10 เท่า เมื่อแทรกเหตุการณ์

ผลลัพธ์เหล่านี้เกินจริงไปมากกว่านี้อีกเมื่อเพิ่มโอเวอร์เฮดของ HTTP เนื่องจากการแทรกแต่ละรายการยังต้องการโอเวอร์เฮดของคำขอ HTTP ของตัวเองด้วย โดยที่ไฟล์ที่อัปโหลดหนึ่งไฟล์เป็นเพียงคำขอเดียวเท่านั้น

อ่านเพิ่มเติม

คู่มือสตรีม Substacks

ภาพจาก RLA-Inque