Python multiprocessing Pool map และ imap

ฉันมีสคริปต์ multiprocessing พร้อม pool.map ที่ใช้งานได้ ปัญหาคือไม่ใช่ว่าทุกกระบวนการจะใช้เวลานานกว่าจะเสร็จสิ้น ดังนั้นบางกระบวนการจึงหลับไปเพราะรอจนกว่ากระบวนการทั้งหมดจะเสร็จสิ้น (ปัญหาเดียวกับใน คำถามนี้) ไฟล์บางไฟล์เสร็จสิ้นภายในเวลาไม่ถึงวินาที ส่วนบางไฟล์ใช้เวลาไม่กี่นาที (หรือชั่วโมง)

หากฉันเข้าใจคู่มือ (และโพสต์นี้) ถูกต้อง pool.imap ไม่ได้รอให้กระบวนการทั้งหมดเสร็จสิ้น หากทำเสร็จแล้ว จะเป็นการเตรียมไฟล์ใหม่ให้ดำเนินการ เมื่อฉันลองทำเช่นนั้น สคริปต์กำลังเร่งความเร็วให้กับไฟล์ที่จะประมวลผล ไฟล์ขนาดเล็กจะถูกประมวลผลตามที่คาดไว้ ไฟล์ขนาดใหญ่ (ที่ใช้เวลาในการประมวลผลนานกว่า) จะไม่เสร็จสิ้นจนกว่าจะสิ้นสุด (ถูกฆ่าโดยไม่แจ้งให้ทราบล่วงหน้า ?) นี่เป็นพฤติกรรมปกติสำหรับ pool.imap หรือฉันต้องเพิ่มคำสั่ง/พารามิเตอร์เพิ่มเติมหรือไม่ เมื่อฉันเพิ่ม time.sleep(100) ในส่วน else เป็นการทดสอบ ระบบกำลังประมวลผลไฟล์ขนาดใหญ่กว่า แต่กระบวนการอื่นๆ เข้าสู่โหมดสลีป มีข้อเสนอแนะอะไรบ้าง? ขอบคุณ

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # Optionally try to gracefully shut down the worker processes here.       
            p.close()
            p.terminate()
            p.join()
        except StopIteration:
            continue     
        else:
            time.sleep(100)
            os.chdir('..')
        p.close()
        p.join() 

if __name__ == '__main__':
    main()    

person avierstr    schedule 24.11.2016    source แหล่งที่มา
comment
ฉันกำลังคิดถึงปัญหา imap Map กำลังรอให้กระบวนการทั้งหมดเสร็จสิ้นเพื่อส่งคืนผลลัพธ์ Imap กำลังส่งคืนผลลัพธ์ทันทีที่กระบวนการแรกเสร็จสิ้น และอาจยุติกระบวนการอื่นๆ และมอบงานใหม่ทั้งหมด สิ่งนี้สามารถถูกต้องได้หรือไม่?   -  person avierstr    schedule 25.11.2016


คำตอบ (1)


เนื่องจากคุณใส่ไฟล์ทั้งหมดของคุณไว้ในรายการแล้ว คุณจึงสามารถใส่ไฟล์เหล่านั้นลงในคิวได้โดยตรง จากนั้นคิวจะถูกแชร์กับกระบวนการย่อยของคุณที่รับชื่อไฟล์จากคิวและดำเนินการต่างๆ ไม่จำเป็นต้องทำสองครั้ง (อันดับแรกในรายการ จากนั้นดองรายการโดย Pool.imap) Pool.imap กำลังทำสิ่งเดียวกันทุกประการแต่โดยที่คุณไม่รู้ตัว

todolist = []
for infile in os.listdir():  
    todolist.append(infile)

สามารถถูกแทนที่ด้วย:

todolist = Queue()
for infile in os.listdir():  
    todolist.put(infile)

วิธีแก้ปัญหาที่สมบูรณ์จะมีลักษณะดังนี้:

def process_file(inqueue):
    for infile in iter(inqueue.get, "STOP"):
        #do stuff until inqueue.get returns "STOP"
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = Queue()
        for infile in os.listdir():  
            todolist.put(infile)
        process = [Process(target=process_file,
                      args=(todolist) for x in range(nprocesses)]
        for p in process:
            #task the processes to stop when all files are handled
            #"STOP" is at the very end of queue
            todolist.put("STOP")
        for p in process:
            p.start()
        for p in process:
            p.join()    
if __name__ == '__main__':
    main()
person RaJa    schedule 25.11.2016
comment
ขอบคุณมากราชา! ตอนนี้มันทำงานได้ตามที่ฉันต้องการ เพื่อความสมบูรณ์: args=(todolist) for x in range(nprocesses)] จะต้องเป็น args=(todolist,)) for x in range(nprocesses)] เมื่อคืนก่อนฉันได้ลองใช้ Queue แต่กลับพบข้อผิดพลาดมากมาย ตอนนี้มันชัดเจนสำหรับฉันว่ามันทำงานอย่างไร! - person avierstr; 25.11.2016