Многопроцессорность с общей очередью и критериями окончания

У меня есть эта оригинальная функция, которую я хочу переключить на многопроцессорность:

def optimal(t0, tf, frequences, delay, ratio = 0):

    First = True                            # First        
    for s in delay:
        delay = 0                           # delay between signals,
        timelines = list()

        for i in range(len(frequences)):
            timelines.append(time_builder(frequences[i], t0+delay, tf))
            delay += s

       trio_overlap = trio_combination(timelines, ratio)

        valid = True
        for items in trio_overlap.values():
            if len(list(set(items))) == len(items):
                continue
            else:
                valid = False

        if not valid:
            continue

        overlap = duo_combination(timelines)

    optimal = ... depending of conditions        
    return optimal

Если valid = True после теста, он вычислит параметр оптимизации с именем optim_param и попытается минимизировать его. Если оно достигает определенного порога, optim_param < 0.3, я выхожу из цикла и принимаю это значение в качестве своего ответа.

Моя проблема в том, что по мере разработки моей модели сложность начинает расти, а однопоточные вычисления занимают слишком много времени. Я хотел бы обрабатывать вычисления параллельно. Поскольку каждый процесс должен будет сравнивать результат, полученный со значением s, с текущим оптимальным, я попытался реализовать Queue.

Я впервые занимаюсь многопроцессорной обработкой, и даже если я думаю, что нахожусь на правильном пути, мне кажется, что мой код беспорядочный и неполный. Могу ли я получить некоторую помощь?

Спасибо


person Mathieu    schedule 22.02.2018    source источник
comment
Помимо сбоя моего компьютера, это решение малоэффективно :/   -  person Mathieu    schedule 23.02.2018
comment
Вы показали образец ввода на shift и frequencies. Не могли бы вы показать пример ввода t0 и tf? Кроме того, не могли бы вы пересмотреть shift с меньшим количеством шагов? Скажем, 1000 или 10000 выполняются параллельно, чтобы упростить начальное тестирование?   -  person thewaywewere    schedule 27.02.2018
comment
t0 и tf — это начальное и конечное время, которые используются для вычисления временных шкал (которые представляют собой список моментов, скажем, [0, 50, 100, 150] для сигнала 20 Гц). Допустим, t0 = 0 и tf = 200. Для сдвига, конечно, мы можем взять np.arange(0, 2, 0.1) для начала.   -  person Mathieu    schedule 27.02.2018


Ответы (1)


Вместо создания процесса для каждого случая вручную рассмотрите возможность использования Pool.imap_unordered. Хитрость заключается в том, как чисто завершить работу при получении приемлемого результата: вы можете реализовать это, передав генератору, который завершает работу раньше, если установлен флаг, который проверяет каждый цикл. Основная программа считывает данные из итератора, поддерживает наилучший результат и устанавливает флаг, когда он достаточно хорош. Последняя хитрость заключается в замедлении чтения (внутреннего) потока из генератора, чтобы предотвратить большое количество невыполненных запланированных задач, которые необходимо ждать (или, что нечисто, уничтожать) после получения хорошего результата. Учитывая количество процессов в пуле, этого темпа можно добиться с помощью семафора.

Вот пример (с тривиальным анализом), чтобы продемонстрировать:

import multiprocessing,threading,os

def interrupted(data,sem,interrupt):
  for x in data:
    yield x
    sem.acquire()
    if interrupt: break

def analyze(x): return x**2

np=os.cpu_count()
pool=multiprocessing.Pool(np)
sem=threading.Semaphore(np-1)
token=[]                        # mutable

vals=pool.imap_unordered(analyze,interrupted(range(-10,10),sem,token))
pool.close()                    # optional: to let processes exit faster

best=None
for res in vals:
  if best is None or res<best:
    best=res
    if best<5: token.append(None) # make it truthy
  sem.release()
pool.join()

print(best)

Конечно, есть и другие способы поделиться семафором и флагом прерывания с генератором; этот способ использует уродливый тип данных, но имеет преимущество в том, что не использует глобальные переменные (или даже замыкания).

person Davis Herring    schedule 26.02.2018
comment
Я вроде понял идею. Я посмотрю на это, однако, это мой первый раз, когда я использую многопроцессорность, и даже если я не новичок в python, я далек от свободного владения языком. - person Mathieu; 26.02.2018
comment
Я понимаю идею этого, но мне не удается создать работающий код самостоятельно... Несмотря на множество испытаний и сбой компьютера x') - person Mathieu; 26.02.2018
comment
Спасибо, завтра посмотрю. - person Mathieu; 27.02.2018