Asyncio и бесконечный цикл

@asyncio.coroutine
    def listener():
        while True:
            message = yield from websocket.recieve_message()
            if message:
                yield from handle(message)

loop = asyncio.get_event_loop()
loop.run_until_complete(listener())

Допустим, я использую веб-сокеты с asyncio. Это означает, что я получаю сообщения от websockets. И когда я получаю сообщение, я хочу обработать его, но теряю всю асинхронность с моим кодом. Потому что yield from handle(message) определенно блокирует ... Как мне найти способ сделать его неблокирующим? Например, обрабатывать несколько сообщений одновременно. Мне не нужно ждать обработки сообщения, прежде чем я смогу обработать другое сообщение.

Спасибо.


person Cookie    schedule 02.12.2015    source источник
comment
Обычно для чтения требуется задача для каждого веб-сокета, запись в веб-сокет может выполняться асинхронно из другой задачи. handle также может быть отдельной задачей. Ваш код неполный, поэтому получить именно то, что вам нужно, непросто.   -  person Andrew Svetlov    schedule 02.12.2015
comment
если это единственная вызываемая вами сопрограмма, то слушатель будет блокироваться в том смысле, что он будет работать бесконечно (из-за цикла while true). Если у вас есть другая сопрограмма, работающая одновременно (с ее собственными операторами yield from), тогда asyncio будет подпрыгивать между операторами yield from и, таким образом, перестанет «блокировать».   -  person songololo    schedule 02.12.2015


Ответы (1)


Если вас не волнует возвращаемое значение из сообщения дескриптора, вы можете просто создать для него новую задачу, которая будет запускаться в цикле событий вместе с вашим читателем веб-сокета. Вот простой пример:

@asyncio.coroutine
def listener():
    while True:
        message = yield from websocket.recieve_message()
        if message:
            asyncio.ensure_future(handle(message))

ensure_future создаст задачу и прикрепит ее к циклу событий по умолчанию. Поскольку цикл уже запущен, он будет обрабатываться параллельно с вашим читателем websocket. Фактически, если это медленно выполняющаяся задача с блокировкой ввода-вывода (например, отправка электронной почты), вы можете легко запустить несколько десятков задач обработки (сообщений) одновременно. Они создаются динамически, когда это необходимо, и уничтожаются по завершении (с гораздо меньшими накладными расходами, чем при порождении потоков).

Если вам нужно немного больше контроля, вы можете просто написать в asyncio.Queue в считывателе и получить пул задач фиксированного размера, который может использовать очередь, типичный шаблон в многопоточном или многопроцессорном программировании.

@asyncio.coroutine
def consumer(queue):
    while True:
        message = yield from queue.get()
        yield from handle(message)

@asyncio.coroutine
def listener(queue):
    for i in range(5):
         asyncio.ensure_future(consumer(queue))
    while True:
        message = yield from websocket.recieve_message()
        if message:
            yield from q.put(message)

q = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(listener(q))
person Ethan Frey    schedule 02.02.2016