Многопроцессорная карта пула Python и imap

У меня есть скрипт multiprocessing с pool.map, который работает. Проблема в том, что не все процессы занимают столько времени, чтобы завершиться, поэтому некоторые процессы засыпают, потому что они ждут, пока все процессы не будут завершены (та же проблема, что и в этот вопрос). Некоторые файлы создаются менее чем за секунду, другие занимают минуты (или часы).

Насколько я понимаю руководство (и этот пост) правильно, pool.imap не ждет завершения всех процессов, если один из них завершен, он предоставляет для обработки новый файл. Когда я пытаюсь это сделать, сценарий ускоряет обработку файлов, маленькие обрабатываются, как и ожидалось, большие файлы (для обработки которых требуется больше времени) не заканчиваются до конца (уничтожаются без предупреждения?). Это нормальное поведение для pool.imap или мне нужно добавить больше команд/параметров? Когда я добавляю time.sleep(100) в часть else в качестве теста, он обрабатывает больше больших файлов, но другие процессы засыпают. Какие-либо предложения ? Спасибо

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # Optionally try to gracefully shut down the worker processes here.       
            p.close()
            p.terminate()
            p.join()
        except StopIteration:
            continue     
        else:
            time.sleep(100)
            os.chdir('..')
        p.close()
        p.join() 

if __name__ == '__main__':
    main()    

person avierstr    schedule 24.11.2016    source источник
comment
Я думал о проблеме imap. Map ожидает завершения всех процессов, чтобы вернуть результаты. Imap возвращает результат, как только завершается первый процесс, и, вероятно, завершает остальные и дает всем новое задание. Может ли это быть правильным?   -  person avierstr    schedule 25.11.2016


Ответы (1)


Поскольку вы уже поместили все свои файлы в список, вы можете поместить их прямо в очередь. Затем очередь используется совместно с вашими подпроцессами, которые берут имена файлов из очереди и выполняют свои функции. Не нужно делать это дважды (сначала в список, затем в список рассола по Pool.imap). Pool.imap делает то же самое, но без вашего ведома.

todolist = []
for infile in os.listdir():  
    todolist.append(infile)

можно заменить на:

todolist = Queue()
for infile in os.listdir():  
    todolist.put(infile)

Тогда полное решение будет выглядеть так:

def process_file(inqueue):
    for infile in iter(inqueue.get, "STOP"):
        #do stuff until inqueue.get returns "STOP"
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = Queue()
        for infile in os.listdir():  
            todolist.put(infile)
        process = [Process(target=process_file,
                      args=(todolist) for x in range(nprocesses)]
        for p in process:
            #task the processes to stop when all files are handled
            #"STOP" is at the very end of queue
            todolist.put("STOP")
        for p in process:
            p.start()
        for p in process:
            p.join()    
if __name__ == '__main__':
    main()
person RaJa    schedule 25.11.2016
comment
Спасибо большое RaJa! Теперь он работает так, как я хотел. Для полноты: args=(todolist) for x in range(nprocesses)] должно быть args=(todolist,)) for x in range(nprocesses)]. Я пробовал с Queue прошлой ночью, но до сих пор получил много ошибок. Теперь мне ясно, как это работает! - person avierstr; 25.11.2016