หลามมัลติโพรเซสซิงพูลลองอีกครั้ง

มีวิธีการส่งข้อมูลซ้ำเพื่อการประมวลผลหรือไม่ หากการคำนวณดั้งเดิมล้มเหลว โดยใช้พูลแบบธรรมดา

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])

person atp    schedule 18.07.2012    source แหล่งที่มา
comment
บางทีคุณอาจต้องการ return f(x) แทนที่จะเพิ่ม ValueError? แค่คาดเดา...   -  person Paulo Freitas    schedule 24.07.2012
comment
โอกาสที่จะล้มเหลวในการสมัครจริงของคุณมีสูงเพียงใด? นั่นคือ มีความสำคัญเพียงใดที่กระบวนการลองใหม่ทันที แทนที่จะรอให้กระบวนการอื่นๆ เสร็จสิ้นก่อน?   -  person Isaac    schedule 24.07.2012
comment
มีโอกาสล้มเหลวปานกลาง และไม่จำเป็นต้องลองใหม่ทันที (แต่ควรลองอีกครั้งพร้อมกันในท้ายที่สุด)   -  person atp    schedule 24.07.2012


คำตอบ (2)


หากคุณสามารถ (หรือไม่รังเกียจ) ลองใหม่ได้ทันที ให้ใช้มัณฑนากรห่อฟังก์ชัน:

import random
from multiprocessing import Pool
from functools import wraps

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ValueError:
                pass
    return wrapped

@retry
def f(x):
    if random.getrandbits(1):
        raise ValueError("Retry this computation")
    return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
person Andrew Alcock    schedule 24.07.2012

คุณสามารถใช้ Queue เพื่อป้อนกลับความล้มเหลวใน Pool ผ่านการวนซ้ำในการเริ่มต้น Process:

import multiprocessing as mp
import random

def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    total_items = len(pending)
    successful = []
    failure_tracker = []

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, pending)
    retry_results = []
    while len(successful) < total_items:
        successful.extend([r for r in results if not r is None])
        successful.extend([r for r in retry_results if not r is None])
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        if failed_items:
            failure_tracker.append(failed_items)
            retry_results = p.imap(f, failed_items);
    p.close()
    p.join()

    print "Results: %s" % successful
    print "Failures: %s" % failure_tracker

if __name__ == '__main__':
    main(range(1, 10))

ผลลัพธ์จะเป็นดังนี้:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]

A Pool ไม่สามารถแชร์ระหว่างหลายกระบวนการได้ ดังนั้นแนวทางที่ใช้ Queue นี้ หากคุณพยายามส่งพูลเป็นพารามิเตอร์ไปยังกระบวนการของพูล คุณจะได้รับข้อผิดพลาดนี้:

NotImplementedError: pool objects cannot be passed between processes or pickled

คุณสามารถลองอีกครั้งทันทีภายในฟังก์ชัน f ของคุณเพื่อหลีกเลี่ยงค่าใช้จ่ายในการซิงโครไนซ์ จริงๆ แล้วมันเป็นเรื่องของเวลาที่ฟังก์ชันของคุณควรรอเพื่อลองอีกครั้ง และโอกาสที่จะประสบความสำเร็จหากลองใหม่ทันที


คำตอบเก่า: เพื่อความสมบูรณ์ นี่คือคำตอบเก่าของฉัน ซึ่งไม่เหมาะสมเท่ากับการส่งเข้าสู่กลุ่มโดยตรงอีกครั้ง แต่อาจยังคงมีความเกี่ยวข้อง ขึ้นอยู่กับกรณีการใช้งาน เนื่องจากเป็นวิธีธรรมชาติในการจัดการ/จำกัดการลองใหม่ระดับ n:

คุณสามารถใช้ Queue เพื่อรวมความล้มเหลวและส่งใหม่อีกครั้งเมื่อสิ้นสุดการทดสอบแต่ละครั้ง โดยดำเนินการหลายครั้ง:

import multiprocessing as mp
import random


def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    run_number = 1
    while pending:
        jobs = pending
        pending = []

        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()

        p.join()
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        successful = [r for r in results if not r is None]
        print "(%d) Succeeded: %s" % (run_number, successful)
        print "(%d) Failed:    %s" % (run_number, failed_items)
        print
        pending = failed_items
        run_number += 1

if __name__ == '__main__':
    main(range(1, 10))

ด้วยผลลัพธ์ดังนี้:

(1) Succeeded: [9, 16, 36, 81]
(1) Failed:    [2, 1, 5, 7, 8]

(2) Succeeded: [64]
(2) Failed:    [2, 1, 5, 7]

(3) Succeeded: [1, 25]
(3) Failed:    [2, 7]

(4) Succeeded: [49]
(4) Failed:    [2]

(5) Succeeded: [4]
(5) Failed:    []
person Preet Kukreti    schedule 24.07.2012
comment
อัปเดตคำตอบของฉันเป็นคำตอบที่ไม่ต้องมีการรันหลายครั้ง และตอนนี้ใช้ได้กับพูลเดิมเดียวกัน - person Preet Kukreti; 24.07.2012
comment
ขอบคุณสำหรับการตอบกลับโดยละเอียด ฉันชอบแนวคิดในการใส่การคำนวณที่ล้มเหลวไว้ในคิวเพื่อลองใหม่ ฉันต้องมอบรางวัลให้แอนดรูว์เพราะวิธีแก้ปัญหาของเขาทำได้ง่ายมาก - person atp; 25.07.2012
comment
@ash ฉันได้พูดถึงการลองใหม่ทันทีในการตอบกลับโดยคิดว่ามันจะเป็นการเพิ่มเติมเล็กน้อย / ง่าย ๆ และไม่ใช่สิ่งที่คุณกำลังมองหา โปรดทราบด้วยว่า (การลองใหม่ทันที) นั้นไม่เหมาะสมสำหรับทุกกรณี โดยเฉพาะอย่างยิ่งในกรณีที่การลองใหม่ทันทีมีโอกาสสำเร็จต่ำ (ซึ่งในกรณีนี้ถือว่าไม่มีประสิทธิภาพอย่างมากเนื่องจากจะทำให้ทรัพยากรขาดแคลนสำหรับงานที่อาจประสบความสำเร็จ) ขอแสดงความยินดีกับ Andrew ถึงอย่างไร. - person Preet Kukreti; 26.07.2012