ฉันกำลังสร้างระบบที่ Producer ส่งรายการงานที่จะเข้าคิวซึ่งผู้บริโภคจำนวนหนึ่งจะใช้
สมมติว่าฉันมีรายการงานและสามารถแบ่งออกเป็นสีดำ สีส้ม และสีเหลือง งานสีดำทั้งหมดจะถูกส่งไปยัง Queue_0, Orange ไปยัง Queue_1 และ Yellow ไปยัง Queue_2 และฉันจะกำหนดผู้ปฏิบัติงานให้กับแต่ละคิว (เช่น Consumer_0 ถึง Queue_0, Consumer_1 ถึง Queue_1 และ Consumer_2 ถึง Queue_2) หากบัญชีดำมีขนาดใหญ่ขึ้น ฉันต้องการเพิ่ม Consumer (เช่น: Consumer_3) เพิ่มเติมใน Queue_0 เพื่อช่วยเหลือ Consumer_0
ฉันดูบทช่วยสอน RabbitMQ ใน คิวผู้ปฏิบัติงาน และ การกำหนดเส้นทาง ฉันคิดว่าการกำหนดเส้นทางจะช่วยแก้ปัญหาของฉันได้ ฉันเปิดตัวเทอร์มินัลสามเครื่อง ผู้ผลิตหนึ่งรายและผู้บริโภคสองคนซึ่งจะได้รับงานของคนผิวดำ เมื่อผู้ผลิตส่งงานสีดำสองสามงาน (Black_Task_1, Black_Task_2) ผู้บริโภคทั้งสองจะได้รับข้อความทั้งสอง (เช่น: Consumer_0 ได้รับ Black_Task_1 และ Black_Task_2, Consumer_3 จะได้รับ Black_Task_1 และ Black_Task_2 ด้วย) ฉันต้องการให้ผู้บริโภคแบ่งปันงาน ไม่ใช่ทำงานเดียวกัน ตัวอย่าง Consumer_0 ทำ Black_Task_1 ในขณะที่ Consumer_3 ทำ Black_Task_2 ฉันสามารถบรรลุการกำหนดค่าใดได้บ้าง
=============================
อัปเดต
นี่คือโค้ดตัวอย่างที่นำมาจาก RabbitMQ บทช่วยสอนการกำหนดเส้นทาง ฉันปรับเปลี่ยนเล็กน้อย โปรดทราบว่ารหัสนี้ไม่ได้ส่งคิวสีดำ สีส้ม หรือสีเหลือง แต่แนวคิดก็อยู่ที่นั่น
emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
รับ_logs_direct.py
#!/usr/bin/env python
import pika
import sys
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
time.sleep(1)
print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=queue_name)
channel.start_consuming()
ผู้ผลิต
nuttynibbles$ ./4_emit_log_direct.py info "run run info"
[x] Sent 'info':'run run info'
ผู้บริโภค_0
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done
ผู้บริโภค_3
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done