Многопроцессорный пул Python повторяет попытки

Есть ли способ повторно отправить часть данных для обработки, если исходное вычисление не удалось, используя простой пул?

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])

person atp    schedule 18.07.2012    source источник
comment
Возможно, вы хотите return f(x) вместо поднятия ValueError? Просто предположил...   -  person Paulo Freitas    schedule 24.07.2012
comment
Насколько высока вероятность отказа в вашем реальном приложении? То есть, насколько важно, чтобы процесс повторил попытку немедленно, а не ждал, пока другие процессы закончатся первыми?   -  person Isaac    schedule 24.07.2012
comment
Это умеренная вероятность сбоя, и ее не нужно немедленно повторять (но в конечном итоге ее следует повторять параллельно).   -  person atp    schedule 24.07.2012


Ответы (2)


Если вы можете (или не возражаете) повторить попытку немедленно, используйте декоратор, обертывающий функцию:

import random
from multiprocessing import Pool
from functools import wraps

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ValueError:
                pass
    return wrapped

@retry
def f(x):
    if random.getrandbits(1):
        raise ValueError("Retry this computation")
    return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
person Andrew Alcock    schedule 24.07.2012

Вы можете использовать Queue для возврата ошибок в Pool через цикл в инициирующем Process:

import multiprocessing as mp
import random

def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    total_items = len(pending)
    successful = []
    failure_tracker = []

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, pending)
    retry_results = []
    while len(successful) < total_items:
        successful.extend([r for r in results if not r is None])
        successful.extend([r for r in retry_results if not r is None])
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        if failed_items:
            failure_tracker.append(failed_items)
            retry_results = p.imap(f, failed_items);
    p.close()
    p.join()

    print "Results: %s" % successful
    print "Failures: %s" % failure_tracker

if __name__ == '__main__':
    main(range(1, 10))

Вывод такой:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]

Pool не может использоваться совместно несколькими процессами. Отсюда и этот подход, основанный на Queue. Если вы попытаетесь передать пул в качестве параметра процессам пулов, вы получите эту ошибку:

NotImplementedError: pool objects cannot be passed between processes or pickled

В качестве альтернативы вы можете попробовать выполнить несколько немедленных повторных попыток внутри вашей функции f, чтобы избежать накладных расходов на синхронизацию. На самом деле это вопрос того, как скоро ваша функция должна ожидать повторной попытки, и насколько велика вероятность успеха, если повторная попытка будет выполнена немедленно.


Старый ответ: Для полноты картины вот мой старый ответ, который не так оптимален, как повторная отправка непосредственно в пул, но все еще может быть актуален. в зависимости от варианта использования, потому что это обеспечивает естественный способ справиться с/ограничить повторные попытки уровня n:

Вы можете использовать Queue для агрегирования ошибок и повторной отправки в конце каждого запуска в течение нескольких запусков:

import multiprocessing as mp
import random


def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    run_number = 1
    while pending:
        jobs = pending
        pending = []

        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()

        p.join()
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        successful = [r for r in results if not r is None]
        print "(%d) Succeeded: %s" % (run_number, successful)
        print "(%d) Failed:    %s" % (run_number, failed_items)
        print
        pending = failed_items
        run_number += 1

if __name__ == '__main__':
    main(range(1, 10))

с таким выводом:

(1) Succeeded: [9, 16, 36, 81]
(1) Failed:    [2, 1, 5, 7, 8]

(2) Succeeded: [64]
(2) Failed:    [2, 1, 5, 7]

(3) Succeeded: [1, 25]
(3) Failed:    [2, 7]

(4) Succeeded: [49]
(4) Failed:    [2]

(5) Succeeded: [4]
(5) Failed:    []
person Preet Kukreti    schedule 24.07.2012
comment
Обновлен мой ответ на тот, который не требует нескольких прогонов, и теперь работает в том же исходном пуле. - person Preet Kukreti; 24.07.2012
comment
Спасибо за подробный ответ. Мне нравится идея помещать неудачные вычисления в очередь для повторения. Я должен наградить Эндрю наградой, потому что его решение делает простую повторную попытку. - person atp; 25.07.2012
comment
@ash Я упомянул немедленные повторные попытки в своем ответе, думая, что это будет тривиальное / простое дополнение, а не то, что вы искали. Также обратите внимание, что это (немедленные повторные попытки) не является оптимальным для всех случаев, особенно в тех случаях, когда немедленная повторная попытка имеет низкий шанс на успех (в этом случае она сильно неоптимальна, поскольку вызывает нехватку ресурсов для заданий, которые потенциально могут быть успешными). Поздравляем Эндрю. тем не мение. - person Preet Kukreti; 26.07.2012