Peta dan imap kumpulan multiprosesing Python

Saya memiliki skrip multiprocessing dengan pool.map yang berfungsi. Permasalahannya adalah tidak semua proses membutuhkan waktu lama untuk diselesaikan, sehingga beberapa proses tertidur karena menunggu hingga semua proses selesai (masalah yang sama seperti pada pertanyaan ini). Beberapa file selesai dalam waktu kurang dari satu detik, yang lain membutuhkan waktu beberapa menit (atau jam).

Jika saya memahami manual (dan postingan ini) benar, pool.imap tidak menunggu semua proses selesai, jika sudah selesai, ia menyediakan file baru untuk diproses. Ketika saya mencobanya, skrip mempercepat file untuk diproses, yang kecil diproses seperti yang diharapkan, file besar (yang membutuhkan lebih banyak waktu untuk diproses) tidak selesai sampai akhir (terbunuh tanpa pemberitahuan?). Apakah ini perilaku normal untuk pool.imap, atau apakah saya perlu menambahkan lebih banyak perintah/parameter? Ketika saya menambahkan time.sleep(100) di bagian else sebagai pengujian, itu memproses lebih banyak file besar tetapi proses lainnya tertidur. Ada saran? Terima kasih

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # Optionally try to gracefully shut down the worker processes here.       
            p.close()
            p.terminate()
            p.join()
        except StopIteration:
            continue     
        else:
            time.sleep(100)
            os.chdir('..')
        p.close()
        p.join() 

if __name__ == '__main__':
    main()    

person avierstr    schedule 24.11.2016    source sumber
comment
Saya telah memikirkan tentang masalah imap. Map sedang menunggu semua proses selesai untuk mengembalikan hasil. Imap mengembalikan hasilnya segera setelah proses pertama selesai, dan mungkin menghentikan proses lainnya dan memberikan semua pekerjaan baru. Apakah ini benar?   -  person avierstr    schedule 25.11.2016


Jawaban (1)


Karena Anda sudah memasukkan semua file ke dalam daftar, Anda dapat langsung memasukkannya ke dalam antrian. Antrian tersebut kemudian dibagikan dengan sub-proses Anda yang mengambil nama file dari antrian dan melakukan tugasnya. Tidak perlu melakukannya dua kali (pertama ke dalam daftar, lalu acar daftar dengan Pool.imap). Pool.imap melakukan hal yang persis sama tetapi tanpa Anda sadari.

todolist = []
for infile in os.listdir():  
    todolist.append(infile)

dapat diganti dengan:

todolist = Queue()
for infile in os.listdir():  
    todolist.put(infile)

Solusi lengkapnya akan terlihat seperti:

def process_file(inqueue):
    for infile in iter(inqueue.get, "STOP"):
        #do stuff until inqueue.get returns "STOP"
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = Queue()
        for infile in os.listdir():  
            todolist.put(infile)
        process = [Process(target=process_file,
                      args=(todolist) for x in range(nprocesses)]
        for p in process:
            #task the processes to stop when all files are handled
            #"STOP" is at the very end of queue
            todolist.put("STOP")
        for p in process:
            p.start()
        for p in process:
            p.join()    
if __name__ == '__main__':
    main()
person RaJa    schedule 25.11.2016
comment
Terima kasih banyak Raja! Sekarang berfungsi sesuai keinginan saya. Untuk kelengkapan: args=(todolist) for x in range(nprocesses)] harus args=(todolist,)) for x in range(nprocesses)]. Saya telah mencoba dengan Queue kemarin malam, tetapi sejauh ini mendapatkan banyak kesalahan. Sekarang jelas bagi saya cara kerjanya! - person avierstr; 25.11.2016