RabbitMQ — объединение рабочих очередей и очередей маршрутизации

Я создаю систему, в которой производитель отправляет список задач для постановки в очередь, которые будут потребляться несколькими потребителями.

Предположим, у меня есть список задач, и их можно разделить на черные, оранжевые и желтые. Все черные задачи отправляются в Queue_0, оранжевые — в Queue_1 и желтые — в Queue_2. И я назначу работника для каждой очереди (т.е. Consumer_0 для Queue_0, Consumer_1 для Queue_1 и Consumer_2 для Queue_2). Если черные списки станут больше, я хочу добавить дополнительного потребителя (например, Consumer_3) в Queue_0, чтобы помочь Consumer_0.

Я просмотрел учебные пособия по RabbitMQ по рабочим очередям и Маршрутизация. Я думал, что маршрутизация решит мою проблему. Я запустил три терминала, производителя и двух потребителей, которые будут получать задания Black. Когда производитель отправляет несколько черных задач (Black_Task_1, Black_Task_2), оба потребителя получают два сообщения (т. е. Consumer_0 получает Black_Task_1 и Black_Task_2, Consumer_3 также получает Black_Task_1 и Black_Task_2). Я хочу, чтобы мои потребители разделяли задачу, а не выполняли одну и ту же задачу. Например, Consumer_0 выполняет Black_Task_1, а Consumer_3 выполняет Black_Task_2. Какие конфигурации я могу достичь этого?

=============================

Обновлять

Это пример кода, взятый из RabbitMQ, руководства по маршрутизации. Я немного модифицировал. Обратите внимание, что этот код не отправляет черные, оранжевые или желтые очереди. Но концепция есть.

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
    time.sleep(1)
    print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=queue_name)

channel.start_consuming()

Продюсер

nuttynibbles$ ./4_emit_log_direct.py info "run run info"
 [x] Sent 'info':'run run info'

Потребитель_0

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

Потребитель_3

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

person chrizonline    schedule 01.02.2015    source источник
comment
Что вы имеете в виду под разделить задачу — вы хотите, чтобы два потребителя работали над одной и той же задачей?   -  person Burhan Khalid    schedule 01.02.2015
comment
Привет, я объяснил это в своем вопросе выше.   -  person chrizonline    schedule 01.02.2015


Ответы (2)


Я думаю, что ваша основная проблема заключается в следующем:

Если очередь черных lists становится больше, я хочу добавить дополнительного потребителя (например, Consumer_3) в Queue_0, чтобы помочь Consumer_0.

Как только вы добавите в очередь еще одного потребителя, он подхватит следующее доступное сообщение.

Если первый потребитель не подтверждает получение сообщения; тогда несколько рабочих смогут работать с одним и тем же сообщением, поскольку оно останется в очереди.

Поэтому убедитесь, что вы правильно подтверждаете сообщения:

По умолчанию RabbitMQ последовательно отправляет каждое сообщение следующему потребителю. В среднем каждый потребитель получит одинаковое количество сообщений. Такой способ распространения сообщений называется циклическим. [...] Таймаутов сообщений нет; RabbitMQ повторно доставит сообщение только в случае разрыва рабочего соединения. Это нормально, даже если обработка сообщения занимает очень и очень много времени.

В зависимости от характера задачи вы можете разделить работу между несколькими процессами, создав приоритетную очередь; который используется C1 (потребителем) для получения дополнительных ресурсов. В этом случае у вас должны быть готовые и прослушивающие рабочие процессы в отдельной приоритетной очереди; таким образом создается подочередь, в которой обрабатывается T1 (задача).

Однако в других случаях для этого первоначальный C1 должен убедиться, что задача больше недоступна, подтвердив ее получение.

person Burhan Khalid    schedule 01.02.2015
comment
привет, я добавил свои фрагменты кода. я не уверен, что сделал это правильно. в receive_logs_direct.py я добавил basic_ack() в callback(). Но все еще не работает, как ожидалось - person chrizonline; 01.02.2015

Я думаю, что ваша проблема в том, что вы создаете новую очередь для каждого потребителя. Когда вы звоните

результат = channel.queue_declare (эксклюзивный = Истина)

имя_очереди = результат.метод.очередь

в вашем потребителе это объявляет новую очередь, указывает RabbitMQ сгенерировать для нее уникальное имя и помечает ее для исключительного использования каналом в потребителе, который ее вызывает. Это означает, что у каждого потребителя будет своя очередь.

Затем вы привязываете каждую новую очередь к обмену, используя серьезность в качестве ключа маршрутизации. Когда сообщение поступает в прямой Exchange, RabbitMQ направит его копию в каждую очередь, связанную с соответствующим ключом маршрутизации. В очередях нет циклического перебора. Каждый потребитель получит копию сообщения, что вы и наблюдаете.

Я считаю, что вы хотите, чтобы каждый потребитель использовал одно и то же имя для очереди, указывал имя в queue_declare и не делал его эксклюзивным. Тогда все потребители будут слушать одну и ту же очередь. Сообщения будут доставлены одному из потребителей, в основном циклическим способом.

Производитель (программа emit_log.py) не объявляет и не привязывает очередь — это не обязательно, но если привязка не установлена ​​до отправки сообщения, она будет отброшена. Если вы используете фиксированную очередь, вы также можете настроить ее у производителя, просто убедитесь, что используете те же параметры (например, имя_очереди), что и у потребителя.

person egolin    schedule 22.03.2015