GER mengimplementasikan fungsi bootstrap yang mengambil csv_stream, memasukkan datanya ke PostGres, dan mengembalikan janji Q ketika sudah selesai (atau terjadi kesalahan). bootstrap dapat dipecah menjadi dua bagian, mendapatkan koneksi dan memasukkan data.

Mendapatkan Koneksi dari Knex

GER menggunakan pembuat kueri Knex untuk berkomunikasi dan mengelola koneksi ke database. Tindakan pertama untuk fungsi bootstrap adalah mendapatkan koneksi ke database dari Knex:

runner = new knex.client.Runner(knex.client)
runner.ensureConnection()
.then( (connection) =>
 runner.connection = connection
 #Use connection
)
.finally( -> runner.cleanupConnection())

Seperti halnya dengan Upsert, ini adalah cara yang tidak rumit dalam menangani koneksi. Knex memang hebat, tetapi mengatasi fungsionalitasnya bisa jadi sulit.

Memasukkan Data dengan pg-copy-streams

Query yang digunakan untuk memasukkan data adalah:

query = "COPY events (person, action, thing) FROM STDIN CSV"

Kueri ini SALINAN data ke acara tabel, memasukkan baris orang, tindakan, dan benda dari aliran standar dalam (STDIN) sebagai format nilai yang dipisahkan koma (CSV). Misalnya, jika data alirannya adalah:

bob, views, product1
alice, buys, product2

kueri akan menyisipkan dua peristiwa, satu untuk setiap baris.

Menggunakan fungsi from di pg-copy-streams (copyFrom = require('pg-copy-streams').from) kueri dibungkus dan dikirim ke koneksi PostGres:

copy = connection.query(copyFrom(query))

Aliran salinan yang dikembalikan adalah aliran yang dapat ditulis di mana input csv_stream disalurkan:

csv_stream.pipe(copy)

Untuk memberi tahu pemanggil bahwa bootstrap telah selesai atau terjadi kesalahan, Q defer dibuat yang mendengarkan peristiwa akhir dan kesalahan aliran:

deferred = q.defer()
csv_stream.pipe(copy)
.on('end', -> deferred.resolve())
.on('error', (err) -> deferred.reject(err))

Secara keseluruhan penyisipan data ini terlihat seperti:

query = "COPY events (person, action, thing) FROM STDIN CSV"
copy = connection.query(copyFrom(query));
deferred = q.defer()
csv_stream.pipe(copy)
.on('end', -> deferred.resolve())
.on('error', (err) -> deferred.reject(err))
return deferred.promise

Hapi.js dan Streaming

Dengan menggunakan server Hapi.js yang terhubung ke fungsi bootstrap GER, file dapat diunggah dan dialirkan langsung ke PostGres. Seperti yang dijelaskan dalam dokumen Hapi.js. sebuah rute dapat diatur untuk menghasilkan aliran Node.js.

Untuk mengimplementasikan ini, server Hapi.js harus dibuat dengan:

Hapi = require('hapi')
server = new Hapi.Server('localhost', 8000)

Rute yang mengambil unggahan file dan mengubahnya menjadi aliran Node.js ditambahkan ke server Hapi.js dengan:

server.route
 method: 'POST'
 path: 'event/bootstrap'
 config:
  payload:
   maxBytes: 209715200
   output:'stream'
   parse: true
 handler: (request, reply) ->
   #do things...

Opsi handler adalah fungsi yang menangani permintaan. Itu dapat mengakses aliran file yang diunggah dalam payload permintaan, mis. request.payload["acara"]. Aliran ini diteruskan ke fungsi bootstrap GER, yaitu.

handler: (request, reply) =>
  ger.bootstrap(request.payload["events"])
  .then( -> reply({finished: true}))
  .fail((err) -> reply({error: err}).code(500))

Bagian terakhir adalah memulai server Hapi.js dengan server.start()

Menguji Server

Untuk menguji server dan rutenya, curl dapat digunakan untuk mengunggah file, mis.

curl -i -F [email protected] http://localhost:8000/event/bootstrap

curl juga dapat mengambil aliran standar dan mengunggahnya, mis.

head data.csv | curl -i -F events=@- http://localhost:8000/event/bootstrap

Saya hanya ingin meluangkan waktu dan memeriksa betapa hebatnya hal ini. head membuat aliran standar, menyalurkannya ke curl yang mengubahnya menjadi permintaan multipart HTTP, Hapi.js mengubah permintaan itu menjadi aliran Node.js, yang kemudian disalurkan ke PostGres sebagai aliran standar dari fungsi bootstrap GER . Itu keren sekali!

Metrik Kinerja

Saya menulis tes mocha kecil yang membandingkan penyisipan 10.000 peristiwa ke dalam GER metode satu peristiwa pada satu waktu, dan membandingkannya dengan memasukkan 10.000 peristiwa menggunakan fungsi bootstrap.

Hasilnya adalah:

  1. 0,7297 md per peristiwa bila setiap peristiwa disisipkan satu per satu
  2. 0,0696 md per acara untuk acara yang menggunakan bootstrap

Itu adalah peningkatan kinerja 10 kali lipat saat menyisipkan acara.

Hasil ini bahkan lebih dibesar-besarkan ketika menambahkan overhead HTTP, karena setiap penyisipan juga memerlukan overhead permintaan HTTPnya sendiri di mana satu file yang diunggah hanya satu permintaan.

Bacaan lebih lanjut

Buku Panduan Aliran Substacks

Gambar dari RLA-Inque