RabbitMQ - รวมคิวงานและคิวการกำหนดเส้นทาง

ฉันกำลังสร้างระบบที่ Producer ส่งรายการงานที่จะเข้าคิวซึ่งผู้บริโภคจำนวนหนึ่งจะใช้

สมมติว่าฉันมีรายการงานและสามารถแบ่งออกเป็นสีดำ สีส้ม และสีเหลือง งานสีดำทั้งหมดจะถูกส่งไปยัง Queue_0, Orange ไปยัง Queue_1 และ Yellow ไปยัง Queue_2 และฉันจะกำหนดผู้ปฏิบัติงานให้กับแต่ละคิว (เช่น Consumer_0 ถึง Queue_0, Consumer_1 ถึง Queue_1 และ Consumer_2 ถึง Queue_2) หากบัญชีดำมีขนาดใหญ่ขึ้น ฉันต้องการเพิ่ม Consumer (เช่น: Consumer_3) เพิ่มเติมใน Queue_0 เพื่อช่วยเหลือ Consumer_0

ฉันดูบทช่วยสอน RabbitMQ ใน คิวผู้ปฏิบัติงาน และ การกำหนดเส้นทาง ฉันคิดว่าการกำหนดเส้นทางจะช่วยแก้ปัญหาของฉันได้ ฉันเปิดตัวเทอร์มินัลสามเครื่อง ผู้ผลิตหนึ่งรายและผู้บริโภคสองคนซึ่งจะได้รับงานของคนผิวดำ เมื่อผู้ผลิตส่งงานสีดำสองสามงาน (Black_Task_1, Black_Task_2) ผู้บริโภคทั้งสองจะได้รับข้อความทั้งสอง (เช่น: Consumer_0 ได้รับ Black_Task_1 และ Black_Task_2, Consumer_3 จะได้รับ Black_Task_1 และ Black_Task_2 ด้วย) ฉันต้องการให้ผู้บริโภคแบ่งปันงาน ไม่ใช่ทำงานเดียวกัน ตัวอย่าง Consumer_0 ทำ Black_Task_1 ในขณะที่ Consumer_3 ทำ Black_Task_2 ฉันสามารถบรรลุการกำหนดค่าใดได้บ้าง

=============================

อัปเดต

นี่คือโค้ดตัวอย่างที่นำมาจาก RabbitMQ บทช่วยสอนการกำหนดเส้นทาง ฉันปรับเปลี่ยนเล็กน้อย โปรดทราบว่ารหัสนี้ไม่ได้ส่งคิวสีดำ สีส้ม หรือสีเหลือง แต่แนวคิดก็อยู่ที่นั่น

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

รับ_logs_direct.py

#!/usr/bin/env python
import pika
import sys
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
    time.sleep(1)
    print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=queue_name)

channel.start_consuming()

ผู้ผลิต

nuttynibbles$ ./4_emit_log_direct.py info "run run info"
 [x] Sent 'info':'run run info'

ผู้บริโภค_0

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

ผู้บริโภค_3

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

person chrizonline    schedule 01.02.2015    source แหล่งที่มา
comment
คุณหมายถึงอะไร แบ่งปันงาน - คุณต้องการให้ผู้บริโภคสองคนทำงานเดียวกันหรือไม่   -  person Burhan Khalid    schedule 01.02.2015
comment
สวัสดี ฉันได้อธิบายเพิ่มเติมในคำถามของฉันข้างต้นแล้ว   -  person chrizonline    schedule 01.02.2015


คำตอบ (2)


ฉันคิดว่าปัญหาพื้นฐานของคุณอยู่ที่:

หากคิว Black lists มีขนาดใหญ่ขึ้น ฉันต้องการเพิ่ม Consumer เพิ่มเติม (เช่น: Consumer_3) ลงใน Queue_0 เพื่อช่วยเหลือ Consumer_0

ทันทีที่คุณเพิ่มผู้บริโภครายอื่นในคิว ระบบจะรับข้อความถัดไปที่มีอยู่

หากผู้บริโภครายแรกไม่รับทราบข้อความ; จากนั้นผู้ปฏิบัติงานหลายคนจะสามารถทำงานกับข้อความเดียวกันได้เนื่องจากจะยังคงอยู่ในคิว

ดังนั้นตรวจสอบให้แน่ใจว่าคุณรับทราบข้อความอย่างถูกต้อง:

ตามค่าเริ่มต้น RabbitMQ จะส่งข้อความแต่ละข้อความไปยังผู้บริโภครายถัดไปตามลำดับ โดยเฉลี่ยแล้วผู้บริโภคทุกคนจะได้รับข้อความจำนวนเท่ากัน วิธีการกระจายข้อความนี้เรียกว่าการวนซ้ำ [...] ไม่มีการหมดเวลาข้อความใดๆ RabbitMQ จะส่งข้อความอีกครั้งเฉพาะเมื่อการเชื่อมต่อของผู้ปฏิบัติงานขาดหายไป เป็นเรื่องปกติแม้ว่าการประมวลผลข้อความจะใช้เวลานานมากก็ตาม

ขึ้นอยู่กับลักษณะของงาน คุณอาจสามารถแบ่งงานระหว่างหลายกระบวนการได้โดยการสร้างคิวลำดับความสำคัญ ซึ่งใช้โดย C1 (ผู้บริโภค) เพื่อรับทรัพยากรเพิ่มเติม ในกรณีนี้ คุณจะต้องมีคนงานพร้อมและรับฟังคิวลำดับความสำคัญที่แยกจากกัน ดังนั้นการสร้างคิวย่อยโดยที่ T1 (งาน) กำลังถูกประมวลผล

อย่างไรก็ตาม อย่างอื่นในการดำเนินการนี้ เริ่มต้น C1 ต้องแน่ใจว่างานไม่พร้อมใช้งานอีกต่อไปโดยการยอมรับการรับ

person Burhan Khalid    schedule 01.02.2015
comment
สวัสดี ฉันได้เพิ่มข้อมูลโค้ดแล้ว ฉันไม่แน่ใจว่าฉันทำถูกต้องหรือไม่ ในreceive_logs_direct.py ฉันได้เพิ่ม basic_ack() ใน callback() แต่ก็ยังไม่ได้ผลตามที่คาดไว้ - person chrizonline; 01.02.2015

ฉันคิดว่าปัญหาของคุณคือคุณกำลังสร้างคิวใหม่สำหรับผู้บริโภคแต่ละราย เมื่อคุณโทร

ผล = channel.queue_declare (พิเศษ = True)

Queue_name = result.method.queue

ในผู้บริโภคของคุณ สิ่งนี้จะประกาศคิวใหม่ บอกให้ RabbitMQ สร้างชื่อที่ไม่ซ้ำสำหรับมัน และทำเครื่องหมายสำหรับการใช้งานเฉพาะโดยช่องทางในผู้บริโภคที่เรียกมัน นั่นหมายความว่าผู้บริโภคแต่ละรายจะมีคิวของตัวเอง

จากนั้นคุณผูกแต่ละคิวใหม่เข้ากับการแลกเปลี่ยนโดยใช้ความรุนแรงเป็นคีย์การกำหนดเส้นทาง เมื่อข้อความเข้ามาใน Exchange โดยตรง RabbitMQ จะกำหนดเส้นทางสำเนาของข้อความนั้นไปยังทุกคิวที่เชื่อมโยงกับคีย์การกำหนดเส้นทางที่ตรงกัน ไม่มีการปัดเศษข้ามคิว ผู้บริโภคแต่ละรายจะได้รับสำเนาข้อความ ซึ่งเป็นสิ่งที่คุณสังเกตเห็น

ฉันเชื่อว่าสิ่งที่คุณต้องการทำคือให้ผู้บริโภคแต่ละคนใช้ชื่อเดียวกันสำหรับคิว ระบุชื่อใน Queue_declare และอย่าทำให้เป็นชื่อเฉพาะ จากนั้นผู้บริโภคทุกคนก็จะฟังคิวเดียวกัน ข้อความจะถูกส่งไปยังผู้บริโภครายหนึ่ง โดยพื้นฐานแล้วจะเป็นแบบพบกันหมด

ผู้ผลิต (โปรแกรม emit_log.py) จะไม่ประกาศหรือผูกคิว - ไม่จำเป็น แต่หากไม่ได้สร้างการเชื่อมโยงก่อนที่จะส่งข้อความ ข้อความนั้นจะถูกยกเลิก หากคุณกำลังใช้คิวคงที่ คุณสามารถให้ผู้ผลิตตั้งค่าได้เช่นกัน เพียงให้แน่ใจว่าใช้พารามิเตอร์เดียวกันกับผู้บริโภค

person egolin    schedule 22.03.2015