GER реализует функцию начальной загрузки, которая принимает csv_stream, вставляет его данные в PostGres и возвращает обещание Q, когда оно завершено (или выдано ошибка). bootstrap можно разбить на две части: получение соединения и вставка данных.
Получение соединения от Knex
GER использует построитель запросов Knex для связи и управления подключениями к базе данных. Первым действием функции начальной загрузки является получение соединения с базой данных от Knex:
runner = new knex.client.Runner(knex.client) runner.ensureConnection() .then( (connection) => runner.connection = connection #Use connection ) .finally( -> runner.cleanupConnection())
Как и в случае с Upsert, это немного окольный способ работы со связями. Knex великолепен, но работать с его функциональными возможностями может быть сложно.
Вставка данных с помощью pg-copy-streams
Запрос, который используется для вставки данных:
query = "COPY events (person, action, thing) FROM STDIN CSV"
Этот запрос КОПИРУЕТ данные в таблицу событий, вставляя строки «человек», «действие» и «вещь» из стандартного потока в (STDIN) в формате значений, разделенных запятыми (CSV). Например, если данные потоков были:
bob, views, product1 alice, buys, product2
запрос будет вставлять два события, по одному для каждой строки.
С помощью функции from в pg-copy-streams (copyFrom = require(‘pg-copy-streams’).from) запрос упаковывается и отправляется на соединение PostGres:
copy = connection.query(copyFrom(query))
Возвращенный поток копирования является доступным для записи потоком, в который передается входной поток csv_stream:
csv_stream.pipe(copy)
Чтобы уведомить вызывающую программу о том, что начальная загрузка завершилась или произошла ошибка, создается отсрочка Q, которая прослушивает конец потока и события ошибки:
deferred = q.defer() csv_stream.pipe(copy) .on('end', -> deferred.resolve()) .on('error', (err) -> deferred.reject(err))
Все вместе эта вставка данных выглядит так:
query = "COPY events (person, action, thing) FROM STDIN CSV" copy = connection.query(copyFrom(query)); deferred = q.defer() csv_stream.pipe(copy) .on('end', -> deferred.resolve()) .on('error', (err) -> deferred.reject(err)) return deferred.promise
Hapi.js и потоки
Используя сервер Hapi.js, подключенный к функции начальной загрузки GER, файл может быть загружен и передан непосредственно в PostGres. Как описано в Документации Hapi.js. маршрут можно настроить для вывода потока Node.js.
Для реализации этого необходимо создать сервер Hapi.js с:
Hapi = require('hapi') server = new Hapi.Server('localhost', 8000)
Маршрут, который принимает загрузку файла и превращает его в поток Node.js, добавляется на сервер Hapi.js с помощью:
server.route method: 'POST' path: 'event/bootstrap' config: payload: maxBytes: 209715200 output:'stream' parse: true handler: (request, reply) -> #do things...
Опция обработчика — это функция, которая обрабатывает запрос. Он может получить доступ к загруженному файловому потоку в полезной нагрузке запросов, например. request.payload["события"]. Этот поток передается функции начальной загрузки GER, т.е.
handler: (request, reply) => ger.bootstrap(request.payload["events"]) .then( -> reply({finished: true})) .fail((err) -> reply({error: err}).code(500))
Заключительная часть — запустить сервер Hapi.js с помощью server.start().
Тестирование сервера
Чтобы протестировать сервер и маршрут, можно использовать curl для загрузки файла, например.
curl -i -F [email protected] http://localhost:8000/event/bootstrap
curl также может принимать стандартный поток и загружать его, например.
head data.csv | curl -i -F events=@- http://localhost:8000/event/bootstrap
Я просто хотел бы потратить время и проверить, насколько это круто. head создает стандартный поток, направляет его в curl, который превращает его в составной HTTP-запрос, Hapi.js превращает этот запрос в поток Node.js, который затем передается в PostGres как стандартный поток из функции начальной загрузки GER. . Это просто круто!
Показатели эффективности
Я написал небольшой тест мокко, который сравнивал вставку 10 000 событий в GER по одному событию за раз и сравнивал его со вставкой 10 000 событий с помощью функции начальной загрузки.
Результаты были следующими:
- 0,7297 мс на событие, когда каждое событие вставлялось по одному
- 0,0696 мс на событие для событий, использующих начальную загрузку
Это 10-кратное повышение производительности при вставке событий.
Эти результаты еще более преувеличены, если добавить накладные расходы HTTP, поскольку каждая вставка также требует накладных расходов собственного HTTP-запроса, где один загруженный файл — это только один запрос.
Дальнейшее чтение
Руководство по потокам подстеков
Изображение из RLA-Inque