Вы когда-нибудь сталкивались с ситуацией, когда вам нужно запланировать задание или создать повторяющееся задание в NodeJS? Если да, то вы, возможно, думали о методе setInterval в JavaScript. Или вы, возможно, использовали пакеты npm, такие как cron или node-schedule.

Они выполняют свою работу, если работает только один экземпляр сервера, но что, если вы разрабатываете систему, которая должна быть высокодоступной, и для этого у вас есть несколько запущенных экземпляров сервера? Тогда вышеупомянутые методы будут иметь проблемы с параллелизмом. Это может привести к тому, что одно и то же задание будет запланировано несколько раз при запуске нескольких серверов.
В этом случае также будет сложно обрабатывать ошибки для невыполненных заданий.

Параллелизм был основной причиной, по которой я начал искать другие решения, и очереди пришли мне на помощь.

Очереди можно использовать тремя разными способами:
1. Производитель заданий
2. Потребитель заданий
3. Слушатель событий.

Производители будут добавлять задания в Очередь, Потребители / Рабочие будут выполнять работу, а Слушатели событий будут прослушивать события, которые происходят в этой Очереди.
Один экземпляр Очереди может использоваться для всех трех ролей, и многие производители, потребители и слушатели могут использовать одну и ту же очередь одновременно.
Поскольку очереди используют Redis для связи между производителями и потребителями, это решает нашу первую проблему параллелизма. Если производители и потребители могут подключаться к Redis, они могут сотрудничать при обработке задания.
Очереди также обеспечивают асинхронную связь, благодаря чему они не блокируют код.

Введение в Bull Queue

Bull - это библиотека узлов, которая реализует очереди на основе Redis. Хотя очереди могут быть напрямую реализованы в Redis, эта библиотека обрабатывает все низкоуровневые коды.
Использование Bull также решает проблему обработки ошибок, поскольку обеспечивает механизм RETRY. Мы можем передать то количество попыток, которое хотим сделать для работы.

Bull добавляет в Очередь только уникальные вакансии. Таким образом, это решает проблему многократного планирования одних и тех же заданий.

Возьмем простой пример. Допустим, вы хотите распечатать количество всех пользователей, присутствующих в вашей БД, каждые 10 минут. Проделаем эту операцию с помощью библиотеки Bull.

Использование библиотеки Bull-

npm install bull

Теперь создайте новый файл app.js и импортируйте в него бычью библиотеку.

const Queue = require(‘bull’);

Мы должны определить конфигурацию Redis, если мы используем настраиваемую конфигурацию.

let redisConf = {
    host: 127.0.0.1, 
    port: 6379, 
    username: USERNAME, 
    password: PASSWORD
};

Создайте новую очередь с настраиваемой конфигурацией Redis.

let queue = new Queue(‘myQueue’, {redis: redisConf} );
// Where ‘myQueue’ is the name of Queue.

Роль продюсера - создавать работу, то есть составлять график. График может быть разовым или повторяющимся. У нас могут быть оба типа расписаний в Bull. В этом посте мы приводим пример повторяющегося расписания.

Поскольку мы хотим запланировать задание на 10 минут, мы должны объявить для этого тайм-аут.

let timeout = 600000; // Time is in milliseconds
let data = {key: value};
queue.add(data, {repeat: {every: timeout} } );
//Where data is an object. 
//It can be an empty object also. 
//The value of data can be used while consuming the job. 

Роль потребителя - потреблять работу или работать над ней. queue.process () будет вызываться каждый раз, когда в очереди есть задание. Поскольку у нас есть повторяющееся задание продолжительностью 10 минут, функция процесса будет вызываться каждые 10 минут.

queue.process(async (job) => {
    let data = job.data;
    /* using job.data, the data object can be used which was passed   
    while creating the job */
    /* we want to print count of all user after every 10 minutes */
    let count = await db.countUsersDummyFunc();
    console.log(count);
});

Как только задание будет обработано, его состояние изменится на завершенное. Мы можем использовать роль прослушивателя событий очереди и прослушивать «завершенное» событие.

queue.on('completed', async (job) => {
    console.log(`Job ${job.id}` completed`);
    /* Completed jobs will remain in Redis. It may cause unnecesarry memory utilization. For this we have the option to remove the job. */
    await job.remove();
});

В этом посте мы обсудили все три роли очередей. Мы начали с добавления задания в очереди (роль продюсера). Затем обработка этого задания через определенные промежутки времени (роль потребителя). И в конце мы прослушали событие в очереди (роль прослушивателя событий).
Мы еще не обсудили, как обрабатывать неудачные задания и добавлять повторные попытки к заданию. Вероятно, это будет частью моего следующего поста.

Спасибо, что прочитали это. Хорошего дня впереди.