GER реализует функцию начальной загрузки, которая принимает csv_stream, вставляет его данные в PostGres и возвращает обещание Q, когда оно завершено (или выдано ошибка). bootstrap можно разбить на две части: получение соединения и вставка данных.

Получение соединения от Knex

GER использует построитель запросов Knex для связи и управления подключениями к базе данных. Первым действием функции начальной загрузки является получение соединения с базой данных от Knex:

runner = new knex.client.Runner(knex.client)
runner.ensureConnection()
.then( (connection) =>
 runner.connection = connection
 #Use connection
)
.finally( -> runner.cleanupConnection())

Как и в случае с Upsert, это немного окольный способ работы со связями. Knex великолепен, но работать с его функциональными возможностями может быть сложно.

Вставка данных с помощью pg-copy-streams

Запрос, который используется для вставки данных:

query = "COPY events (person, action, thing) FROM STDIN CSV"

Этот запрос КОПИРУЕТ данные в таблицу событий, вставляя строки «человек», «действие» и «вещь» из стандартного потока в (STDIN) в формате значений, разделенных запятыми (CSV). Например, если данные потоков были:

bob, views, product1
alice, buys, product2

запрос будет вставлять два события, по одному для каждой строки.

С помощью функции from в pg-copy-streams (copyFrom = require(‘pg-copy-streams’).from) запрос упаковывается и отправляется на соединение PostGres:

copy = connection.query(copyFrom(query))

Возвращенный поток копирования является доступным для записи потоком, в который передается входной поток csv_stream:

csv_stream.pipe(copy)

Чтобы уведомить вызывающую программу о том, что начальная загрузка завершилась или произошла ошибка, создается отсрочка Q, которая прослушивает конец потока и события ошибки:

deferred = q.defer()
csv_stream.pipe(copy)
.on('end', -> deferred.resolve())
.on('error', (err) -> deferred.reject(err))

Все вместе эта вставка данных выглядит так:

query = "COPY events (person, action, thing) FROM STDIN CSV"
copy = connection.query(copyFrom(query));
deferred = q.defer()
csv_stream.pipe(copy)
.on('end', -> deferred.resolve())
.on('error', (err) -> deferred.reject(err))
return deferred.promise

Hapi.js и потоки

Используя сервер Hapi.js, подключенный к функции начальной загрузки GER, файл может быть загружен и передан непосредственно в PostGres. Как описано в Документации Hapi.js. маршрут можно настроить для вывода потока Node.js.

Для реализации этого необходимо создать сервер Hapi.js с:

Hapi = require('hapi')
server = new Hapi.Server('localhost', 8000)

Маршрут, который принимает загрузку файла и превращает его в поток Node.js, добавляется на сервер Hapi.js с помощью:

server.route
 method: 'POST'
 path: 'event/bootstrap'
 config:
  payload:
   maxBytes: 209715200
   output:'stream'
   parse: true
 handler: (request, reply) ->
   #do things...

Опция обработчика — это функция, которая обрабатывает запрос. Он может получить доступ к загруженному файловому потоку в полезной нагрузке запросов, например. request.payload["события"]. Этот поток передается функции начальной загрузки GER, т.е.

handler: (request, reply) =>
  ger.bootstrap(request.payload["events"])
  .then( -> reply({finished: true}))
  .fail((err) -> reply({error: err}).code(500))

Заключительная часть — запустить сервер Hapi.js с помощью server.start().

Тестирование сервера

Чтобы протестировать сервер и маршрут, можно использовать curl для загрузки файла, например.

curl -i -F [email protected] http://localhost:8000/event/bootstrap

curl также может принимать стандартный поток и загружать его, например.

head data.csv | curl -i -F events=@- http://localhost:8000/event/bootstrap

Я просто хотел бы потратить время и проверить, насколько это круто. head создает стандартный поток, направляет его в curl, который превращает его в составной HTTP-запрос, Hapi.js превращает этот запрос в поток Node.js, который затем передается в PostGres как стандартный поток из функции начальной загрузки GER. . Это просто круто!

Показатели эффективности

Я написал небольшой тест мокко, который сравнивал вставку 10 000 событий в GER по одному событию за раз и сравнивал его со вставкой 10 000 событий с помощью функции начальной загрузки.

Результаты были следующими:

  1. 0,7297 мс на событие, когда каждое событие вставлялось по одному
  2. 0,0696 мс на событие для событий, использующих начальную загрузку

Это 10-кратное повышение производительности при вставке событий.

Эти результаты еще более преувеличены, если добавить накладные расходы HTTP, поскольку каждая вставка также требует накладных расходов собственного HTTP-запроса, где один загруженный файл — это только один запрос.

Дальнейшее чтение

Руководство по потокам подстеков

Изображение из RLA-Inque