percobaan ulang kumpulan multiprosesing python

Apakah ada cara untuk mengirim ulang sebagian data untuk diproses, jika perhitungan awal gagal, menggunakan kumpulan sederhana?

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])

person atp    schedule 18.07.2012    source sumber
comment
Mungkin Anda ingin return f(x) daripada menaikkan ValueError? Hanya menebak...   -  person Paulo Freitas    schedule 24.07.2012
comment
Seberapa tinggi kemungkinan kegagalan dalam aplikasi Anda sebenarnya? Artinya, seberapa pentingkah proses tersebut segera dicoba ulang dibandingkan menunggu proses lainnya selesai terlebih dahulu?   -  person Isaac    schedule 24.07.2012
comment
Kemungkinan kegagalannya sedang, dan tidak perlu segera dicoba ulang (tetapi pada akhirnya harus dicoba lagi secara paralel).   -  person atp    schedule 24.07.2012


Jawaban (2)


Jika Anda dapat (atau tidak keberatan) segera mencoba lagi, gunakan dekorator yang membungkus fungsi tersebut:

import random
from multiprocessing import Pool
from functools import wraps

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ValueError:
                pass
    return wrapped

@retry
def f(x):
    if random.getrandbits(1):
        raise ValueError("Retry this computation")
    return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
person Andrew Alcock    schedule 24.07.2012

Anda dapat menggunakan Queue untuk memasukkan kembali kegagalan ke dalam Pool melalui perulangan di Process awal:

import multiprocessing as mp
import random

def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    total_items = len(pending)
    successful = []
    failure_tracker = []

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, pending)
    retry_results = []
    while len(successful) < total_items:
        successful.extend([r for r in results if not r is None])
        successful.extend([r for r in retry_results if not r is None])
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        if failed_items:
            failure_tracker.append(failed_items)
            retry_results = p.imap(f, failed_items);
    p.close()
    p.join()

    print "Results: %s" % successful
    print "Failures: %s" % failure_tracker

if __name__ == '__main__':
    main(range(1, 10))

Outputnya seperti ini:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]

A Pool tidak dapat dibagikan di antara banyak proses. Oleh karena itu pendekatan berbasis Queue ini. Jika Anda mencoba meneruskan kumpulan sebagai parameter ke proses kumpulan, Anda akan mendapatkan kesalahan ini:

NotImplementedError: pool objects cannot be passed between processes or pickled

Sebagai alternatif, Anda dapat mencoba beberapa percobaan ulang langsung dalam fungsi Anda f, untuk menghindari overhead sinkronisasi. Yang terpenting adalah seberapa cepat fungsi Anda harus menunggu untuk dicoba ulang, dan seberapa besar kemungkinan berhasil jika segera dicoba ulang.


Jawaban Lama: Demi kelengkapan, inilah jawaban lama saya, yang tidak seoptimal mengirim ulang langsung ke pool, namun mungkin masih relevan bergantung pada kasus penggunaannya, karena ini memberikan cara alami untuk menangani/membatasi percobaan ulang level n:

Anda dapat menggunakan Queue untuk menggabungkan kegagalan dan mengirimkannya kembali di akhir setiap proses, dalam beberapa proses:

import multiprocessing as mp
import random


def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    run_number = 1
    while pending:
        jobs = pending
        pending = []

        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()

        p.join()
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        successful = [r for r in results if not r is None]
        print "(%d) Succeeded: %s" % (run_number, successful)
        print "(%d) Failed:    %s" % (run_number, failed_items)
        print
        pending = failed_items
        run_number += 1

if __name__ == '__main__':
    main(range(1, 10))

dengan keluaran seperti ini:

(1) Succeeded: [9, 16, 36, 81]
(1) Failed:    [2, 1, 5, 7, 8]

(2) Succeeded: [64]
(2) Failed:    [2, 1, 5, 7]

(3) Succeeded: [1, 25]
(3) Failed:    [2, 7]

(4) Succeeded: [49]
(4) Failed:    [2]

(5) Succeeded: [4]
(5) Failed:    []
person Preet Kukreti    schedule 24.07.2012
comment
Memperbarui jawaban saya menjadi jawaban yang tidak memerlukan banyak proses, dan sekarang berfungsi pada kumpulan asli yang sama. - person Preet Kukreti; 24.07.2012
comment
Terima kasih atas respon yang mendetail. Saya menyukai gagasan memasukkan penghitungan yang gagal ke dalam antrean untuk dicoba ulang. Saya harus memberi hadiah kepada Andrew karena solusinya hanya berupa percobaan ulang yang sederhana. - person atp; 25.07.2012
comment
@ash Saya menyebutkan percobaan ulang segera dalam tanggapan saya, berpikir bahwa itu akan menjadi tambahan yang sepele/sederhana dan bukan yang Anda cari. Perhatikan juga bahwa ini (percobaan ulang segera) tidak optimal untuk semua kasus, terutama kasus-kasus di mana percobaan ulang segera memiliki peluang berhasil yang rendah (dalam hal ini sangat tidak optimal karena menyebabkan kekurangan sumber daya untuk pekerjaan yang berpotensi berhasil.) Selamat kepada Andrew Bagaimanapun. - person Preet Kukreti; 26.07.2012