Я создаю систему, в которой производитель отправляет список задач для постановки в очередь, которые будут потребляться несколькими потребителями.
Предположим, у меня есть список задач, и их можно разделить на черные, оранжевые и желтые. Все черные задачи отправляются в Queue_0, оранжевые — в Queue_1 и желтые — в Queue_2. И я назначу работника для каждой очереди (т.е. Consumer_0 для Queue_0, Consumer_1 для Queue_1 и Consumer_2 для Queue_2). Если черные списки станут больше, я хочу добавить дополнительного потребителя (например, Consumer_3) в Queue_0, чтобы помочь Consumer_0.
Я просмотрел учебные пособия по RabbitMQ по рабочим очередям и Маршрутизация. Я думал, что маршрутизация решит мою проблему. Я запустил три терминала, производителя и двух потребителей, которые будут получать задания Black. Когда производитель отправляет несколько черных задач (Black_Task_1, Black_Task_2), оба потребителя получают два сообщения (т. е. Consumer_0 получает Black_Task_1 и Black_Task_2, Consumer_3 также получает Black_Task_1 и Black_Task_2). Я хочу, чтобы мои потребители разделяли задачу, а не выполняли одну и ту же задачу. Например, Consumer_0 выполняет Black_Task_1, а Consumer_3 выполняет Black_Task_2. Какие конфигурации я могу достичь этого?
=============================
Обновлять
Это пример кода, взятый из RabbitMQ, руководства по маршрутизации. Я немного модифицировал. Обратите внимание, что этот код не отправляет черные, оранжевые или желтые очереди. Но концепция есть.
emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
receive_logs_direct.py
#!/usr/bin/env python
import pika
import sys
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
time.sleep(1)
print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=queue_name)
channel.start_consuming()
Продюсер
nuttynibbles$ ./4_emit_log_direct.py info "run run info"
[x] Sent 'info':'run run info'
Потребитель_0
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done
Потребитель_3
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done