Можно ли запускать процессы пула последовательно?

Следующий код запускает три процесса, они находятся в пуле для обработки 20 рабочих вызовов:

import multiprocessing

def worker(nr):
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

Есть ли способ запустить процессы в последовательности (в отличие от запуска всех процессов одновременно) с задержкой, вставленной между каждым запуском процесса?

Если бы я не использовал Pool, я бы использовал multiprocessing.Process(target=worker, args=(nr,)).start() в цикле, запуская их один за другим и вставляя задержку по мере необходимости. Однако я нахожу Pool чрезвычайно полезным (вместе с вызовом map), поэтому я был бы рад сохранить его, если это возможно.


person WoJ    schedule 14.09.2015    source источник
comment
почему ты хочешь сделать это?   -  person acushner    schedule 15.09.2015
comment
@acushner: например, у меня есть API, который мне нужно вызвать 200 раз. Ограничения API заключаются в том, что каждый вызов длится около 5 минут (это время меняется между вызовами). У меня может быть 10 одновременных вызовов, и каждый вызов должен начинаться с задержкой не менее 5 секунд после завершения предыдущего вызова. Я могу просто добавить 5-секундный сон в начале моего воркера - это отлично работает по пути, за исключением самого старта, когда 10 параллельных вызовов запускаются одновременно. Вот почему создание Pool в последовательности (с 5-секундной задержкой) решит проблему.   -  person WoJ    schedule 16.09.2015
comment
Не могу отредактировать свой предыдущий комментарий, второе предложение должно быть таким: Ограничения API заключаются в том, что каждый вызов длится около 5 минут (это время меняется между вызовами) , у меня может быть 10 вызовов (.. .). Обратите внимание на запятую вместо точки (есть три ограничения)   -  person WoJ    schedule 16.09.2015
comment
вы используете питон 2 или питон 3?   -  person acushner    schedule 16.09.2015
comment
@acushner: питон 3.4.3   -  person WoJ    schedule 16.09.2015


Ответы (4)


Согласно документации, такого контроля над процессами в пуле не существует. Однако вы можете имитировать его с помощью блокировки:

import multiprocessing
import time

lock = multiprocessing.Lock()

def worker(nr):
    lock.acquire()
    time.sleep(0.100)
    lock.release()
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

Ваши 3 процесса по-прежнему будут запускаться одновременно. Ну, я имею в виду, что у вас нет контроля над тем, какой процесс начинает выполнение обратного вызова первым. Но, по крайней мере, вы получите свою задержку. Таким образом, каждый рабочий "запускается" (но на самом деле продолжает) через определенные промежутки времени.

Поправка, возникшая в результате обсуждения ниже:

Обратите внимание, что в Windows невозможно наследовать блокировку от родительского процесса. Вместо этого вы можете использовать multiprocessing.Manager().Lock() для передачи объекта глобальной блокировки между процессами (конечно, с дополнительными издержками IPC). Глобальный объект блокировки также должен быть инициализирован в каждом процессе. Это будет выглядеть так:

from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt

def worker(nr):
    glock.acquire()
    print('started job: {} at {}'.format(nr, dt.now()))
    time.sleep(1)
    glock.release()
    print('ended   job: {} at {}'.format(nr, dt.now()))

numbers = [i for i in range(6)]

def init(lock):
    global glock
    glock = lock

if __name__ == '__main__':
    multiprocessing.freeze_support()
    lock = multiprocessing.Manager().Lock()
    pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()
person Velimir Mlaker    schedule 15.09.2015
comment
Я попробовал ваш код и добавил время (pastebin.com/aFfQgBSD). Результаты одинаковы независимо от того, присутствуют ли lock.acquire() и lock.release() или закомментированы (вывод с наличием блокировки: pastebin.com/3gu70D6q и с двумя закомментированными строками: pastebin.com/MgB6D8E5) - person WoJ; 16.09.2015
comment
Мой код работает по назначению. Вам нужно изменить свой пример pastebin, чтобы наблюдать правильное поведение: ваши воркеры должны получить блокировку перед печатью первого сообщения. В противном случае все процессы будут печатать свои метки времени до того, как какой-либо из них получит блокировку. Затем, наконец, один из них приобретает его, и так далее... То, что вы действительно хотите сделать, это pastebin.com/a2WR0C3N< /а>. - person Velimir Mlaker; 16.09.2015
comment
Что ж, три первых задания по-прежнему печатают «завершено...» одновременно в вашем коде pastebin, что означает, что они не выполнили эту строку последовательно. Они начали в одно и то же время [какая-то магия с блокировкой и ожиданием] и напечатали «закончилось…» все вместе. Чего бы я хотел, так это иметь «концовку ..;» часть рабочих выполнять один за другим. Также, пожалуйста, посмотрите мой пример после комментария сразу после вопроса - для реального приложения. - person WoJ; 16.09.2015
comment
Он отлично работает, вот результат моего кода: pastebin.com/rV7w731i . Можете ли вы вставить свой вывод? - person Velimir Mlaker; 16.09.2015
comment
Спасибо, что не отстаете :) Я увеличил время ожидания до 5 секунд, чтобы сделать сдвиги более заметными (pastebin.com/9QvkZ8fQ). Как видите, 3 воркера начинаются в 8, все они заканчиваются в 13, новые начинаются в 13 и заканчиваются в 18. Это означает, что они не являются последовательными, разделенными 5-секундной задержкой. - person WoJ; 16.09.2015
comment
Попробуем разобраться в этом :). Как это работает для вас pastebin.com/kiWpADQG? Теперь мы используем инициализатор пула для установки объекта глобальной блокировки, созданного с помощью multiprocessing.Manager (аналогично этому ответу: stackoverflow.com/a/8277123 /1510289). Объяснение соответствующих различий между Windows и Linux можно найти по адресу stackoverflow.com/a/24787346/1510289. - person Velimir Mlaker; 16.09.2015
comment
aaand .. это сработало - СПАСИБО. Не могли бы вы переместить его в свой ответ? Я удалю свои комментарии, чтобы сохранить чистый (конечно, принятый) ответ. Спасибо еще раз. - person WoJ; 16.09.2015
comment
Отлично, рад, что сработало для вас! Сначала я не знал, что вы используете Windows. - person Velimir Mlaker; 16.09.2015
comment
Пожалуйста, не используйте lock.acquire/release. Используйте с замком. В вашем коде, если после приобретения происходит исключение, вы провалили собеседование. - person user48956; 27.09.2017
comment
Точно так же close и join должны быть в finally: или вы должны использовать с Pool() в качестве пула: - person user48956; 27.09.2017

Не могли бы вы сделать что-то простое вроде этого:

from multiprocessing import Process
from time import sleep

def f(n):
    print 'started job: '+str(n)
    sleep(3)
    print 'ended job: '+str(n)

if __name__ == '__main__':
    for i in range(0,100):
        p = Process(target=f, args=(i,))
        p.start()
        sleep(1)

Результат

started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5
person taesu    schedule 14.09.2015
comment
Это решение, о котором я упоминал в последней части моего вопроса. Я хотел понять, можно ли контролировать способ запуска Pool процессов. - person WoJ; 15.09.2015
comment
Кроме того, приведенный выше код запустит 100 процессов параллельно, в то время как я ограничиваю 3 процесса и 20 рабочих процессов, потребляемых первым доступным. Однако это можно исправить с помощью Queue. - person WoJ; 16.09.2015

не могли бы вы попробовать определить функцию, которая медленно возвращает ваши значения?

def get_numbers_on_delay(numbers, delay):
    for i in numbers:
        yield i
        time.sleep(delay)

а потом:

results = pool.map(worker, get_numbers_on_delay(numbers, 5))

Я не проверял это, поэтому я не уверен, но попробуйте.

person acushner    schedule 16.09.2015

По какой-то причине я не мог заставить блокирующий ответ работать, поэтому я реализовал его таким образом. Я понимаю, что вопрос старый, но, может быть, у кого-то еще такая же проблема.

Он порождает все процессы, аналогичные решению блокировки, но приостанавливается перед работой в зависимости от их номера имени процесса.

from multiprocessing import current_process
from re import search
from time import sleep

def worker():
    process_number = search('\d+', current_process().name).group()
    time_between_workers = 5
    sleep(time_between_workers * int(process_number))
    #do your work here

Поскольку имена, данные процессам, кажутся уникальными и добавочными, это захватывает номер процесса и засыпает на его основе. SpawnPoolWorker-1 спит 1 * 5 секунд, SpawnPoolWorker-2 спит 2 * 5 секунд и т. д.

person raecer    schedule 04.05.2017