Apakah mungkin untuk memulai proses Pool secara berurutan?

Kode berikut memulai tiga proses, mereka berada dalam satu kumpulan untuk menangani 20 panggilan pekerja:

import multiprocessing

def worker(nr):
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

Apakah ada cara untuk memulai proses secara berurutan (dibandingkan memulai semuanya pada waktu yang sama), dengan penundaan di antara setiap permulaan proses?

Jika tidak menggunakan Pool saya akan menggunakan multiprocessing.Process(target=worker, args=(nr,)).start() dalam satu lingkaran, memulainya satu demi satu dan memasukkan penundaan sesuai kebutuhan. Namun menurut saya Pool sangat berguna (bersama dengan panggilan map) jadi saya akan dengan senang hati menyimpannya jika memungkinkan.


person WoJ    schedule 14.09.2015    source sumber
comment
mengapa kamu ingin melakukan ini?   -  person acushner    schedule 15.09.2015
comment
@acushner: sebuah contoh, saya memiliki API yang perlu saya panggil 200 kali. Batasan API adalah setiap panggilan berlangsung sekitar 5 menit (ini berubah antar panggilan). Saya dapat menjalankan 10 panggilan secara bersamaan dan setiap panggilan harus dimulai dengan penundaan setidaknya 5 detik setelah panggilan sebelumnya berakhir. Saya cukup menambahkan waktu tidur 5 detik di awal pekerja saya - ini berfungsi dengan baik sepanjang kecuali di awal ketika 10 panggilan paralel diluncurkan pada waktu yang sama. Inilah sebabnya mengapa membuat Pool secara berurutan (dengan penundaan 5 detik) akan menyelesaikan masalah.   -  person WoJ    schedule 16.09.2015
comment
Tidak dapat mengedit komentar saya sebelumnya, kalimat kedua seharusnya: Batasan API adalah setiap panggilan berlangsung sekitar 5 menit (ini berubah antar panggilan) , Saya dapat menerima 10 panggilan (.. .). Perhatikan koma, bukan titik (ada tiga batasan)   -  person WoJ    schedule 16.09.2015
comment
apakah Anda menggunakan python 2 atau python 3?   -  person acushner    schedule 16.09.2015
comment
@acushner: ular piton 3.4.3   -  person WoJ    schedule 16.09.2015


Jawaban (4)


Menurut dokumentasi, tidak ada kontrol atas proses gabungan. Namun Anda dapat mensimulasikannya dengan kunci:

import multiprocessing
import time

lock = multiprocessing.Lock()

def worker(nr):
    lock.acquire()
    time.sleep(0.100)
    lock.release()
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

3 proses Anda masih akan dimulai secara bersamaan. Maksud saya adalah Anda tidak memiliki kendali atas proses mana yang mulai menjalankan panggilan balik terlebih dahulu. Tapi setidaknya Anda mendapatkan penundaan Anda. Hal ini secara efektif membuat setiap pekerja "memulai" (tetapi sebenarnya, melanjutkan) pada interval yang ditentukan.

Perubahan hasil diskusi di bawah ini:

Perhatikan bahwa di Windows tidak mungkin mewarisi kunci dari proses induk. Sebagai gantinya, Anda dapat menggunakan multiprocessing.Manager().Lock() untuk mengomunikasikan objek kunci global antar proses (tentu saja dengan overhead IPC tambahan). Objek kunci global juga perlu diinisialisasi dalam setiap proses. Ini akan terlihat seperti:

from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt

def worker(nr):
    glock.acquire()
    print('started job: {} at {}'.format(nr, dt.now()))
    time.sleep(1)
    glock.release()
    print('ended   job: {} at {}'.format(nr, dt.now()))

numbers = [i for i in range(6)]

def init(lock):
    global glock
    glock = lock

if __name__ == '__main__':
    multiprocessing.freeze_support()
    lock = multiprocessing.Manager().Lock()
    pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()
person Velimir Mlaker    schedule 15.09.2015
comment
Saya mencoba kode Anda dan menambahkan beberapa pengaturan waktu (pastebin.com/aFfQgBSD). Hasilnya tetap sama baik lock.acquire() dan lock.release() ada atau dikomentari (output dengan kunci ada: pastebin.com/3gu70D6q dan dengan dua baris yang dikomentari: pastebin.com/MgB6D8E5) - person WoJ; 16.09.2015
comment
Kode saya berfungsi sebagaimana mestinya. Anda perlu mengubah contoh pastebin untuk mengamati perilaku yang benar: pekerja Anda harus mendapatkan kunci sebelum mencetak pesan pertama. Jika tidak, semua proses akan mencetak stempel waktunya sebelum salah satu dari proses tersebut memperoleh kunci. Lalu akhirnya salah satu dari mereka mendapatkannya, dan seterusnya... Yang sebenarnya ingin Anda lakukan adalah pastebin.com/a2WR0C3N. - person Velimir Mlaker; 16.09.2015
comment
Nah, tiga pekerjaan pertama masih mencetak 'berakhir...' secara bersamaan dalam kode pastebin Anda, yang berarti mereka belum menyelesaikan baris tersebut secara berurutan. Mereka telah memulai pada saat yang sama [suatu keajaiban dengan mengunci dan menunggu] dan mencetak 'berakhir ...' secara bersamaan. Yang saya inginkan adalah memiliki 'akhir ..;' bagian dari pekerja dilakukan satu demi satu. Silakan lihat juga contoh saya mengikuti komentar tepat setelah pertanyaan - untuk aplikasi kehidupan nyata. - person WoJ; 16.09.2015
comment
Ini berfungsi dengan baik, inilah output kode saya: pastebin.com/rV7w731i . Bisakah Anda menempelkan keluaran Anda? - person Velimir Mlaker; 16.09.2015
comment
Terima kasih telah mengikuti :) Saya meningkatkan waktu tidur menjadi 5 detik agar peralihannya lebih terlihat (pastebin.com/9QvkZ8fQ). Seperti yang terlihat 3 pekerja memulai pada jam 8, semuanya berakhir pada jam 13, baru mulai pada jam 13 dan berakhir pada jam 18. Artinya tidak berurutan, dipisahkan oleh jeda 5 detik. - person WoJ; 16.09.2015
comment
Mari kita coba memahaminya :). Bagaimana cara kerjanya untuk Anda pastebin.com/kiWpADQG ? Sekarang kita menggunakan inisialisasi kumpulan untuk menyetel objek kunci global yang dibuat melalui multiprocessing.Manager (mirip dengan jawaban ini: stackoverflow.com/a/8277123 /1510289). Penjelasan tentang perbedaan Windows vs. Linux yang relevan dapat ditemukan di stackoverflow.com/a/24787346/1510289 . - person Velimir Mlaker; 16.09.2015
comment
aaannd .. yang ini berhasil - TERIMA KASIH. Bisakah Anda memindahkannya ke jawaban Anda? Saya akan menghapus komentar saya untuk menjaga jawaban yang bersih (tentu saja diterima). Terima kasih lagi. - person WoJ; 16.09.2015
comment
Hebat, senang itu berhasil untuk Anda! Awalnya saya tidak menyadari Anda menggunakan kotak Windows. - person Velimir Mlaker; 16.09.2015
comment
Tolong jangan gunakan lock.acquire/release. Gunakan dengan kunci. Dalam kode Anda, jika pengecualian terjadi setelah perolehan, Anda gagal dalam wawancara kerja. - person user48956; 27.09.2017
comment
Demikian pula, tutup dan gabung harus menjadi yang terakhir: atau Anda harus menggunakan with Pool() sebagai kumpulan: - person user48956; 27.09.2017

Tidak bisakah kamu melakukan sesuatu yang sederhana seperti ini:

from multiprocessing import Process
from time import sleep

def f(n):
    print 'started job: '+str(n)
    sleep(3)
    print 'ended job: '+str(n)

if __name__ == '__main__':
    for i in range(0,100):
        p = Process(target=f, args=(i,))
        p.start()
        sleep(1)

Hasil

started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5
person taesu    schedule 14.09.2015
comment
Ini adalah solusi yang saya sebutkan di bagian terakhir pertanyaan saya. Saya ingin memahami apakah mungkin untuk memiliki kendali atas cara Pool proses dimulai. - person WoJ; 15.09.2015
comment
Selain itu kode di atas akan memulai 100 proses secara paralel, sementara saya membatasi hingga 3 proses dan 20 pekerja digunakan oleh proses pertama yang tersedia. Namun hal ini dapat diperbaiki dengan Queue. - person WoJ; 16.09.2015

bisakah Anda mencoba mendefinisikan fungsi yang menghasilkan nilai Anda secara perlahan?

def get_numbers_on_delay(numbers, delay):
    for i in numbers:
        yield i
        time.sleep(delay)

kemudian:

results = pool.map(worker, get_numbers_on_delay(numbers, 5))

saya belum mengujinya, jadi saya tidak yakin, tapi cobalah.

person acushner    schedule 16.09.2015

Saya tidak bisa mendapatkan jawaban penguncian untuk beberapa alasan jadi saya menerapkannya dengan cara ini. Saya menyadari pertanyaannya sudah lama, tapi mungkin orang lain memiliki masalah yang sama.

Ini memunculkan semua proses yang mirip dengan solusi penguncian, tetapi tertidur sebelum bekerja berdasarkan nomor nama prosesnya.

from multiprocessing import current_process
from re import search
from time import sleep

def worker():
    process_number = search('\d+', current_process().name).group()
    time_between_workers = 5
    sleep(time_between_workers * int(process_number))
    #do your work here

Karena nama yang diberikan pada proses tampak unik dan bertahap, maka ini akan mengambil jumlah proses dan tidur berdasarkan nama tersebut. SpawnPoolWorker-1 tidur 1 * 5 detik, SpawnPoolWorker-2 tidur 2 * 5 detik, dll.

person raecer    schedule 04.05.2017