RabbitMQ - menggabungkan Antrian Kerja dan Antrian Perutean

Saya sedang membangun sistem di mana seorang Produser mengirimkan daftar tugas untuk diantrekan yang akan dikonsumsi oleh sejumlah Konsumen.

Asumsikan saya memiliki daftar tugas dan tugas tersebut dapat dikategorikan menjadi Hitam, Oranye, dan Kuning. Semua tugas Hitam dikirim ke Queue_0, Oranye ke Queue_1 dan Kuning ke Queue_2. Dan saya akan menugaskan seorang pekerja ke setiap antrian (yaitu: Konsumen_0 ke Antrean_0, Konsumen_1 ke Antrean_1 dan Konsumen_2 ke Antrian_2). Jika daftar Hitam bertambah besar, saya ingin menambahkan Konsumen tambahan (yaitu: Konsumen_3) ke Antrian_0 untuk membantu Konsumen_0.

Saya mempelajari tutorial RabbitMQ tentang Antrian Pekerja dan Perutean. Saya pikir Perutean akan menyelesaikan masalah saya. Saya meluncurkan tiga terminal, satu produsen dan dua konsumen yang akan menerima tugas Hitam. Ketika produsen mengirimkan beberapa tugas hitam (Black_Task_1, Black_Task_2), kedua konsumen menerima dua pesan (yaitu: Consumer_0 menerima Black_Task_1 dan Black_Task_2, Consumer_3 juga menerima Black_Task_1 dan Black_Task_2) . Saya ingin konsumen saya berbagi tugas, bukan melakukan tugas yang sama. Contoh, Konsumen_0 mengerjakan Black_Task_1 sedangkan Consumer_3 mengerjakan Black_Task_2. Konfigurasi apa yang bisa saya capai?

=============================

Memperbarui

Ini adalah contoh kode yang diambil dari RabbitMQ, tutorial perutean. Saya memodifikasi sedikit. Perhatikan bahwa kode ini tidak mengirimkan antrian Hitam, Oranye, atau Kuning. Tapi konsepnya ada.

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

terima_logs_direct.py

#!/usr/bin/env python
import pika
import sys
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
    time.sleep(1)
    print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=queue_name)

channel.start_consuming()

Produser

nuttynibbles$ ./4_emit_log_direct.py info "run run info"
 [x] Sent 'info':'run run info'

Konsumen_0

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

Konsumen_3

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

person chrizonline    schedule 01.02.2015    source sumber
comment
Apa maksud Anda berbagi tugas - Anda ingin dua konsumen mengerjakan tugas yang sama?   -  person Burhan Khalid    schedule 01.02.2015
comment
Hai, saya sudah menjelaskannya lebih lanjut pada pertanyaan saya di atas.   -  person chrizonline    schedule 01.02.2015


Jawaban (2)


Saya pikir masalah mendasar Anda adalah dengan ini:

Jika antrian daftar Hitam bertambah besar, saya ingin menambahkan Konsumen tambahan (yaitu: Konsumen_3) ke Queue_0 untuk membantu Konsumen_0.

Segera setelah Anda menambahkan konsumen lain ke antrian, ia akan mengambil pesan berikutnya yang tersedia.

Jika konsumen pertama tidak menerima pesan; maka beberapa pekerja akan dapat mengerjakan pesan yang sama karena pesan tersebut akan tetap berada dalam antrean.

Jadi, pastikan Anda mengenali pesan-pesan tersebut dengan benar:

Secara default, RabbitMQ akan mengirimkan setiap pesan ke konsumen berikutnya, secara berurutan. Rata-rata setiap konsumen akan mendapatkan jumlah pesan yang sama. Cara mendistribusikan pesan ini disebut round-robin. [...] Tidak ada batas waktu pesan; RabbitMQ akan mengirimkan ulang pesan hanya ketika koneksi pekerja terputus. Tidak apa-apa meskipun pemrosesan pesan memerlukan waktu yang sangat-sangat lama.

Tergantung pada sifat tugasnya, Anda mungkin dapat membagi pekerjaan antara beberapa proses dengan membuat antrian prioritas; yang digunakan oleh C1 (konsumen) untuk mendapatkan sumber daya tambahan. Dalam hal ini Anda harus menyiapkan pekerja dan mendengarkan dalam antrean prioritas terpisah; sehingga menciptakan sub-antrean tempat T1 (tugas) sedang diproses.

Namun, selain melakukan hal ini, C1 awal harus memastikan tugas tersebut tidak lagi tersedia dengan mengakui penerimaannya.

person Burhan Khalid    schedule 01.02.2015
comment
hai, saya menambahkan cuplikan kode saya. saya tidak yakin apakah saya melakukannya dengan benar. di receiver_logs_direct.py, saya menambahkan basic_ack() di callback(). Namun masih belum berjalan sesuai harapan - person chrizonline; 01.02.2015

Saya rasa masalah Anda adalah Anda membuat Antrian baru untuk setiap konsumen. Saat Anda menelepon

hasil = saluran.queue_declare(eksklusif=Benar)

nama_antrian = hasil.metode.antrian

di konsumen Anda, ini mendeklarasikan antrean baru, meminta RabbitMQ untuk membuat nama unik untuk antrean tersebut, dan menandainya untuk penggunaan eksklusif oleh saluran di konsumen yang memanggilnya. Artinya, setiap konsumen akan memiliki antriannya masing-masing.

Anda kemudian mengikat setiap Antrean baru ke bursa menggunakan tingkat keparahan sebagai kunci perutean. Saat sebuah pesan masuk ke Exchange langsung, RabbitMQ akan merutekan salinannya ke setiap Antrean yang terikat dengan kunci perutean yang cocok. Tidak ada sistem round-robin di Antrean. Setiap konsumen akan mendapatkan salinan pesan yang Anda amati.

Saya yakin yang ingin Anda lakukan adalah meminta setiap konsumen menggunakan nama antrian yang sama, menentukan nama di queue_declare, dan tidak menjadikannya eksklusif. Kemudian semua konsumen akan mendengarkan antrian yang sama. Pesan-pesan tersebut akan dikirimkan ke salah satu konsumen, pada dasarnya dengan cara round-robin.

Produser (program emit_log.py) tidak mendeklarasikan atau mengikat antrean - hal ini tidak perlu dilakukan, namun jika pengikatan tidak dilakukan sebelum pesan dikirim, maka akan dibuang. Jika Anda menggunakan antrian tetap, Anda dapat meminta produsen mengaturnya juga, pastikan untuk menggunakan parameter yang sama (misalnya nama_antrian) dengan konsumen.

person egolin    schedule 22.03.2015