antrian gevent gagal dengan LoopExit

Saya ingin menggunakan perpustakaan python gevent untuk mengimplementasikan satu server produsen dan beberapa konsumen. Ini adalah upaya saya:

class EmailValidationServer():
   def __init__(self):
      self.queue = Queue()
   def worker(self):
      while True:
          json = self.queue.get()
   def handler(self,socket,address):
      fileobj = socket.makefile()
      content = fileobj.read(max_read)
      contents = json.loads(content)
      for content in contents:
          self.queue.put(content)
   def daemon(self,addr='127.0.0.1',num_thread=5):
      pool = Pool(1000)
      server = StreamServer((addr, 6000),self.handler,spawn=pool) # run
      pool = ThreadPool(num_thread)
      for _ in range(num_thread):
          pool.spawn(self.worker)
      server.serve_forever()
if __name__ == "__main__":
    email_server = EmailValidationServer()
    email_server.daemon()

Saya menggunakan antrian dari gevent.queue.Queue. Ini memberi saya informasi kesalahan:

LoopExit: This operation would block forever
(<ThreadPool at 0x7f08c80eef50 0/4/5>,
 <bound method EmailValidationServer.worker of <__main__.EmailValidationServer instance at 0x7f08c8dcd998>>) failed with LoopExit

Masalah: Tetapi ketika saya mengubah Antrean dari implementasi gevent ke perpustakaan bawaan python, itu berfungsi. Saya tidak tahu alasannya, saya rasa didukung adanya perbedaan antara implementasinya. Saya tidak tahu alasan mengapa gevent tidak mengizinkan penantian tanpa batas. Apakah ada yang bisa memberi penjelasan? Terima kasih sebelumnya


person user3722836    schedule 16.04.2015    source sumber


Jawaban (1)


Saya menyarankan agar Anda menggunakan gevent.queue.JoinableQueue() alih-alih Queue() bawaan Python. Anda dapat merujuk ke panduan antrean resmi untuk Penggunaan API (http://www.gevent.org/gevent.queue.html)

def worker():
    while True:
        item = q.get()
        try:
            do_work(item)
        finally:
            q.task_done()

q = JoinableQueue()
for i in range(num_worker_threads):
     gevent.spawn(worker)

for item in source():
    q.put(item)

q.join()  # block until all tasks are done

Jika Anda menemui pengecualian lagi, sebaiknya Anda memahami sepenuhnya prinsip aliran kendali Gevent corouinte ...Setelah Anda mengerti maksudnya, itu bukan masalah besar. :)

person Ryan Chou    schedule 08.12.2015
comment
@SuperBiasedMan terima kasih telah memformat blok kode saya :) - person Ryan Chou; 08.12.2015