Saya sedang membangun sistem di mana seorang Produser mengirimkan daftar tugas untuk diantrekan yang akan dikonsumsi oleh sejumlah Konsumen.
Asumsikan saya memiliki daftar tugas dan tugas tersebut dapat dikategorikan menjadi Hitam, Oranye, dan Kuning. Semua tugas Hitam dikirim ke Queue_0, Oranye ke Queue_1 dan Kuning ke Queue_2. Dan saya akan menugaskan seorang pekerja ke setiap antrian (yaitu: Konsumen_0 ke Antrean_0, Konsumen_1 ke Antrean_1 dan Konsumen_2 ke Antrian_2). Jika daftar Hitam bertambah besar, saya ingin menambahkan Konsumen tambahan (yaitu: Konsumen_3) ke Antrian_0 untuk membantu Konsumen_0.
Saya mempelajari tutorial RabbitMQ tentang Antrian Pekerja dan Perutean. Saya pikir Perutean akan menyelesaikan masalah saya. Saya meluncurkan tiga terminal, satu produsen dan dua konsumen yang akan menerima tugas Hitam. Ketika produsen mengirimkan beberapa tugas hitam (Black_Task_1, Black_Task_2), kedua konsumen menerima dua pesan (yaitu: Consumer_0 menerima Black_Task_1 dan Black_Task_2, Consumer_3 juga menerima Black_Task_1 dan Black_Task_2) . Saya ingin konsumen saya berbagi tugas, bukan melakukan tugas yang sama. Contoh, Konsumen_0 mengerjakan Black_Task_1 sedangkan Consumer_3 mengerjakan Black_Task_2. Konfigurasi apa yang bisa saya capai?
=============================
Memperbarui
Ini adalah contoh kode yang diambil dari RabbitMQ, tutorial perutean. Saya memodifikasi sedikit. Perhatikan bahwa kode ini tidak mengirimkan antrian Hitam, Oranye, atau Kuning. Tapi konsepnya ada.
emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
terima_logs_direct.py
#!/usr/bin/env python
import pika
import sys
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
time.sleep(1)
print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=queue_name)
channel.start_consuming()
Produser
nuttynibbles$ ./4_emit_log_direct.py info "run run info"
[x] Sent 'info':'run run info'
Konsumen_0
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done
Konsumen_3
nuttynibbles$ ./4_receive_logs_direct_customize.py info
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'run run info'
[x] Done