Как сломать time.sleep() в python concurrent.futures

Я играю с concurrent.futures.

В настоящее время мое будущее звонит time.sleep(secs).

Похоже, что Future.cancel() делает меньше, чем я думал.

Если будущее уже выполняется, то time.sleep() не отменяется им.

То же самое для параметра времени ожидания для wait(). Это не отменяет моего time.sleep().

Как отменить time.sleep(), который выполняется в concurrent.futures?

Для тестирования я использую ThreadPoolExecutor.


person guettli    schedule 19.07.2016    source источник
comment
короткий ответ - никак, и, скорее всего, использование сна в воркерах означает проблему с дизайном, длинный ответ - вы всегда можете реализовать собственный сон с возможностью их сломать, однако это не является ни питоническим, ни правильным. в качестве альтернативы вы можете проверить использование блокировки.   -  person Reishin    schedule 01.08.2016


Ответы (3)


Если вы отправляете функцию в ThreadPoolExecutor, исполнитель запустит функцию в потоке и сохранит возвращаемое значение в объекте Future. Поскольку количество одновременных потоков ограничено, у вас есть возможность отменить ожидающее выполнение будущего, но как только управление в рабочем потоке будет передано вызываемому объекту, нет способа остановить выполнение.

Рассмотрим этот код:

import concurrent.futures as f
import time

T = f.ThreadPoolExecutor(1) # Run at most one function concurrently
def block5():
    time.sleep(5)
    return 1
q = T.submit(block5)
m = T.submit(block5)

print q.cancel()  # Will fail, because q is already running
print m.cancel()  # Will work, because q is blocking the only thread, so m is still queued

В общем, всякий раз, когда вы хотите иметь что-то, что можно отменить, вы сами несете ответственность за то, чтобы убедиться, что это возможно.

Хотя есть и готовые варианты. Например, рассмотрите возможность использования asyncio, они также имеют пример использования сна. Эта концепция позволяет обойти проблему, всякий раз, когда должна быть вызвана любая потенциально блокирующая операция, вместо этого возвращая управление циклу управления, работающему в самом внешнем контексте, вместе с примечанием о том, что выполнение должно быть продолжено всякий раз, когда доступен результат, или, в вашем случае, по прошествии n секунд.

person Phillip    schedule 25.07.2016
comment
О, как весело :-) Я переключился с многопроцессорности на concurrent.futures (по другим причинам). Сейчас думаю о переходе с concurrent.futures на asyncio... :-). Тем не менее, Филипп, спасибо за ответ! - person guettli; 25.07.2016
comment
Пожалуйста ???? Кстати, с multiprocessing было возможно прерывание sleep, потому что вы, конечно, можете kill другие процессы. - person Phillip; 25.07.2016
comment
@ Я думал, что смогу использовать kill и в concurrent.futures. Мне просто нужно переключиться с ThreadPoolExecutor на ProcessPoolExecutor. Или это неправильно? - person guettli; 25.07.2016
comment
Теоретически да, но (а) атрибут _processes не задокументирован и, следовательно, может быть изменен, и (б) после того, как вы обнаружите, что ваше будущее в настоящее время выполняется и не завершено, у вас будет гонка между будущее завершение, и вы убиваете его - если вы проигрываете, вы убиваете другую (не связанную) задачу вместо той, которую планировали. - person Phillip; 25.07.2016
comment
AFAIK linux увеличивает PID для каждого нового процесса, зацикливаясь, если он достигает верхнего предела. Очень маловероятно, что это произойдет. Но вы правы: это состояние гонки. - person guettli; 25.07.2016
comment
Это пул процессов, python не создает новый процесс для каждой задачи! - person Phillip; 25.07.2016
comment
Отличаются ли многопроцессорность и concurrent.futures в этой области? AFAIK оба используют пул процессов. - person guettli; 25.07.2016
comment
При многопроцессорной обработке вы можете напрямую использовать класс Process для вставки собственных средств защиты. Опять же, вы, конечно, можете также создать подкласс ProcessPoolExecutor и добавить возможность безопасного завершения. - person Phillip; 26.07.2016
comment
Я не смог найти документы о вставке мер безопасности в официальных документах. Случай 1: я слеп. Случай 2: в официальной документации это не описано. Если это case2: Как вы думаете, это должно быть задокументировано? Я не доверяю сообщениям в блогах или другим сторонним ресурсам. В долгосрочной перспективе они устарели не в хорошем состоянии. - person guettli; 26.07.2016
comment
Случай 2, и я не думаю, что это следует документировать, потому что это никогда не рекомендуется и полезно только в крайнем случае, а именно, если у вас есть куча длительных задач (иначе нет нужно что-то отменить, просто дайте ему продолжиться и игнорируйте результат), который вам очень нужно отменять редко (иначе killзапускать процесс каждый раз будет излишним, и вы действительно должны спроектировать свою задачу так, чтобы ее можно было отменить). - person Phillip; 26.07.2016
comment
@Phillip, я понимаю, что асинхронное и функциональное программирование становятся популярными, но прерывание времени сна не имеет ничего общего с асинхронным. Я даже скажу, если он использует sleep в ThreadedPool, похоже, у него проблемы с дизайном приложения, и даже asyncio его не спасет. - person Reishin; 01.08.2016
comment
@Reishin Помните, что в примере asyncio используется asyncio.sleep, а не time.sleep. time.sleep можно прервать только отправив сигнал спящему процессу, независимо от интерфейса Python. Я хочу сказать, что если вы хотите спать прерываемым образом, вам вообще не следует использовать nanosleep(2), а вместо этого, например, select(2) на eventfd(2) с использованием тайм-аута. asyncio — это одна из возможных оболочек этого (и подобных) API. - person Phillip; 01.08.2016
comment
@Phillip asyncio.sleep раньше давал возможность обрабатывать другие потоки, пока вызывающий абонент будет ждать, и нет гарантии, что это произойдет через 1 или 2 секунды (из-за природы asyncio). Другие реализации, такие как eventfd r, зависят от платформы, и здесь возникает простой вопрос - как сломать time.sleep, а не как обойти это, чтобы сделать это возможным. - person Reishin; 01.08.2016
comment
@Reishin Все асинхронные функции используются для того, чтобы другие части программы могли быть обработаны. В этом весь смысл и причина, по которой модуль может служить автору средством для написания прерываемых алгоритмов. Если вы используете asyncio правильно, asyncio.sleep гарантирует, что вы будете спать в течение заданного периода времени (до разрешения цикла событий). Я не согласен с характером вопроса. time.sleep использовалось автором в качестве примера; то, о чем он спрашивал, - это то, как (выполняемая) задача, результат которой инкапсулирован в Future, может быть отменена. - person Phillip; 02.08.2016

Я мало что знаю о concurrent.futures, но вы можете использовать эту логику, чтобы разбить время. Используйте цикл вместо sleep.time() или wait()

for i in range(sec):
    sleep(1)

прерывание или прерывание могут быть использованы для выхода из цикла.

person ketan khandagale    schedule 28.07.2016
comment
Да, это может сработать. Это похоже на парня из Финляндии, который просто хочет читать свою почту через коммутируемое соединение... хммм, мне нужен цикл обработки событий.... Мне нужен планировщик.... и, наконец, это ОС. - person guettli; 28.07.2016

Как написано в его ссылке, вы можете используйте оператор with, чтобы обеспечить быструю очистку потоков, как в приведенном ниже примере:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
person iraf    schedule 07.01.2021