ошибка очереди gevent с LoopExit

Я хочу использовать библиотеку gevent python для реализации одного сервера-производителя и нескольких серверов-потребителей. Есть моя попытка:

class EmailValidationServer():
   def __init__(self):
      self.queue = Queue()
   def worker(self):
      while True:
          json = self.queue.get()
   def handler(self,socket,address):
      fileobj = socket.makefile()
      content = fileobj.read(max_read)
      contents = json.loads(content)
      for content in contents:
          self.queue.put(content)
   def daemon(self,addr='127.0.0.1',num_thread=5):
      pool = Pool(1000)
      server = StreamServer((addr, 6000),self.handler,spawn=pool) # run
      pool = ThreadPool(num_thread)
      for _ in range(num_thread):
          pool.spawn(self.worker)
      server.serve_forever()
if __name__ == "__main__":
    email_server = EmailValidationServer()
    email_server.daemon()

Я использовал очередь из gevent.queue.Queue. Это дает мне информацию об ошибке:

LoopExit: This operation would block forever
(<ThreadPool at 0x7f08c80eef50 0/4/5>,
 <bound method EmailValidationServer.worker of <__main__.EmailValidationServer instance at 0x7f08c8dcd998>>) failed with LoopExit

Проблема: но когда я меняю очередь с реализации gevent на встроенную библиотеку python, она работает. Я не знаю причину, я думаю, поддерживается разница между их реализацией. Я не знаю, почему gevent не позволяет бесконечно ждать. Кто-нибудь может дать объяснение? Спасибо заранее


person user3722836    schedule 16.04.2015    source источник


Ответы (1)


Я предлагаю вам использовать gevent.queue.JoinableQueue() вместо встроенного в Python Queue(). Вы можете обратиться к официальному руководству по использованию очередей API (http://www.gevent.org/gevent.queue.html)

def worker():
    while True:
        item = q.get()
        try:
            do_work(item)
        finally:
            q.task_done()

q = JoinableQueue()
for i in range(num_worker_threads):
     gevent.spawn(worker)

for item in source():
    q.put(item)

q.join()  # block until all tasks are done

Если вы снова столкнулись с исключениями, вам лучше полностью понять принцип потока управления Gevent corouinte ... Как только вы поняли суть, это не имело большого значения. :)

person Ryan Chou    schedule 08.12.2015
comment
@SuperBiasedMan спасибо за форматирование моих блоков кода :) - person Ryan Chou; 08.12.2015