เป็นไปได้ไหมที่จะเริ่มกระบวนการ Pool ตามลำดับ?

รหัสต่อไปนี้เริ่มกระบวนการสามกระบวนการ โดยอยู่ในกลุ่มเพื่อรองรับการเรียกของผู้ปฏิบัติงาน 20 ครั้ง:

import multiprocessing

def worker(nr):
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

มีวิธีในการเริ่มต้นกระบวนการตามลำดับหรือไม่ (แทนที่จะให้กระบวนการเริ่มต้นทั้งหมดพร้อมกัน) โดยมีการหน่วงเวลาแทรกอยู่ระหว่างการเริ่มต้นแต่ละกระบวนการ

หากไม่ใช้ Pool ฉันจะใช้ multiprocessing.Process(target=worker, args=(nr,)).start() ในการวนซ้ำ โดยเริ่มต้นทีละรายการและแทรกการหน่วงเวลาตามต้องการ ฉันพบว่า Pool มีประโยชน์อย่างยิ่ง (ร่วมกับการโทร map) ดังนั้นฉันจึงยินดีที่จะเก็บไว้หากเป็นไปได้


person WoJ    schedule 14.09.2015    source แหล่งที่มา
comment
ทำไมคุณถึงอยากทำเช่นนี้?   -  person acushner    schedule 15.09.2015
comment
@acushner: ตัวอย่าง ฉันมี API ที่ต้องเรียกใช้ 200 ครั้ง ข้อจำกัดของ API คือการโทรแต่ละครั้งใช้เวลาประมาณ 5 นาที (ซึ่งจะเปลี่ยนแปลงระหว่างการโทร) ฉันสามารถสนทนาได้ 10 สายพร้อมกัน และการโทรแต่ละครั้งจะต้องเริ่มต้นด้วยการหน่วงเวลาอย่างน้อย 5 วินาทีหลังจากการโทรครั้งก่อนสิ้นสุดลง ฉันสามารถเพิ่มเวลาสลีป 5 วินาทีที่จุดเริ่มต้นของพนักงานได้ - ซึ่งใช้ได้ดีตลอดทางยกเว้นสำหรับการเริ่มต้นเมื่อมีการเรียกใช้การโทรแบบขนาน 10 สายในเวลาเดียวกัน นี่คือสาเหตุที่การสร้าง Pool ตามลำดับ (โดยมีการหน่วงเวลา 5 วินาที) จะช่วยแก้ปัญหาได้   -  person WoJ    schedule 16.09.2015
comment
ไม่สามารถแก้ไขความคิดเห็นก่อนหน้าของฉันได้ ประโยคที่สองควรเป็น: ข้อจำกัดของ API คือการโทรแต่ละครั้งใช้เวลาประมาณ 5 นาที (การเปลี่ยนแปลงระหว่างการโทร) , ฉันสามารถโทรได้ 10 ครั้ง (.. .). สังเกตเครื่องหมายจุลภาคแทนจุดเต็ม (มีข้อจำกัดสามประการ)   -  person WoJ    schedule 16.09.2015
comment
คุณใช้ python 2 หรือ python 3 หรือไม่?   -  person acushner    schedule 16.09.2015
comment
@acushner: หลาม 3.4.3   -  person WoJ    schedule 16.09.2015


คำตอบ (4)


ตามเอกสารประกอบ ไม่มีการควบคุมกระบวนการที่รวบรวมไว้ดังกล่าว อย่างไรก็ตาม คุณสามารถจำลองมันด้วยการล็อคได้:

import multiprocessing
import time

lock = multiprocessing.Lock()

def worker(nr):
    lock.acquire()
    time.sleep(0.100)
    lock.release()
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

กระบวนการทั้ง 3 ของคุณจะยังคงเริ่มต้นพร้อมกัน สิ่งที่ฉันหมายถึงคือคุณไม่สามารถควบคุมได้ว่ากระบวนการใดที่จะเริ่มดำเนินการโทรกลับก่อน แต่อย่างน้อยคุณก็ได้รับความล่าช้า สิ่งนี้ทำให้ผู้ปฏิบัติงานแต่ละคน "เริ่มต้น" อย่างมีประสิทธิภาพ (แต่จริงๆ แล้ว ดำเนินต่อไป) ในช่วงเวลาที่กำหนด

การแก้ไขที่เกิดจากการสนทนาด้านล่าง:

โปรดทราบว่าใน Windows ไม่สามารถสืบทอดการล็อกจากกระบวนการหลักได้ แต่คุณสามารถใช้ multiprocessing.Manager().Lock() เพื่อสื่อสารอ็อบเจ็กต์ล็อกโกลบอลระหว่างกระบวนการต่างๆ แทน (แน่นอนว่ามีค่าใช้จ่าย IPC เพิ่มเติม) จำเป็นต้องเริ่มต้นวัตถุล็อคส่วนกลางในแต่ละกระบวนการเช่นกัน สิ่งนี้จะมีลักษณะดังนี้:

from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt

def worker(nr):
    glock.acquire()
    print('started job: {} at {}'.format(nr, dt.now()))
    time.sleep(1)
    glock.release()
    print('ended   job: {} at {}'.format(nr, dt.now()))

numbers = [i for i in range(6)]

def init(lock):
    global glock
    glock = lock

if __name__ == '__main__':
    multiprocessing.freeze_support()
    lock = multiprocessing.Manager().Lock()
    pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()
person Velimir Mlaker    schedule 15.09.2015
comment
ฉันลองใช้โค้ดของคุณและเพิ่มช่วงเวลา (pastebin.com/aFfQgBSD) ผลลัพธ์จะเหมือนกันไม่ว่าจะมี lock.acquire() และ lock.release() ปรากฏอยู่หรือใส่เครื่องหมายความคิดเห็นไว้ (เอาต์พุตที่มีการล็อคอยู่: pastebin.com/3gu70D6q และมีสองบรรทัดที่ใส่เครื่องหมายความคิดเห็นไว้: pastebin.com/MgB6D8E5) - person WoJ; 16.09.2015
comment
รหัสของฉันทำงานตามที่ตั้งใจไว้ คุณต้องเปลี่ยนตัวอย่าง pastebin ของคุณเพื่อที่จะสังเกตพฤติกรรมที่ถูกต้อง: พนักงานของคุณจำเป็นต้องได้รับการล็อคก่อนจะพิมพ์ข้อความแรก มิฉะนั้น กระบวนการทั้งหมดจะต้องพิมพ์การประทับเวลาก่อนที่กระบวนการใดกระบวนการหนึ่งจะได้รับการล็อค ในที่สุด หนึ่งในนั้นก็ได้มันมา และอื่นๆ... สิ่งที่คุณต้องการทำจริงๆ คือ pastebin.com/a2WR0C3N< /ก>. - person Velimir Mlaker; 16.09.2015
comment
งานสามงานแรกยังคงพิมพ์ 'ended...' พร้อมกันในโค้ด pastebin ของคุณ ซึ่งหมายความว่าพวกเขาไม่ได้ทำบรรทัดนั้นตามลำดับ พวกเขาเริ่มต้นพร้อมกัน [เวทมนตร์บางอย่างกับการล็อคและการรอคอย] และพิมพ์ 'สิ้นสุด ... ' ทั้งหมดเข้าด้วยกัน สิ่งที่อยากได้คือ 'ตอนจบ ..;' คนงานส่วนหนึ่งจะทำทีละคน โปรดดูตัวอย่างของฉันตามความคิดเห็นหลังคำถาม - สำหรับแอปพลิเคชันในชีวิตจริง - person WoJ; 16.09.2015
comment
มันใช้งานได้ดี นี่คือผลลัพธ์ของโค้ดของฉัน: pastebin.com/rV7w731i คุณสามารถวางเอาต์พุตของคุณได้หรือไม่? - person Velimir Mlaker; 16.09.2015
comment
ขอขอบคุณที่ติดตาม :) ฉันเพิ่มการนอนหลับเป็น 5 วินาทีเพื่อให้มองเห็นการเปลี่ยนแปลงได้มากขึ้น (pastebin.com/9QvkZ8fQ) ดังที่คุณเห็นคนงาน 3 คนเริ่มต้นที่ 8 ทุกคนสิ้นสุดที่ 13 เริ่มต้นใหม่ที่ 13 และสิ้นสุดที่ 18 ซึ่งหมายความว่าพวกเขาไม่ได้เรียงลำดับกันโดยคั่นด้วยความล่าช้า 5 วินาที - person WoJ; 16.09.2015
comment
มาลองเข้าเรื่องกันดีกว่า :) สิ่งนี้ทำงานอย่างไรสำหรับคุณ pastebin.com/kiWpADQG ? ตอนนี้เรากำลังใช้ตัวสร้างพูลเพื่อตั้งค่าวัตถุล็อคส่วนกลางที่สร้างผ่าน multiprocessing.Manager (คล้ายกับคำตอบนี้: stackoverflow.com/a/8277123 /1510289) คำอธิบายเกี่ยวกับความแตกต่างระหว่าง Windows และ Linux ที่เกี่ยวข้องสามารถดูได้ที่ stackoverflow.com/a/24787346/1510289 - person Velimir Mlaker; 16.09.2015
comment
aaannd .. อันนี้ใช้ได้ - ขอบคุณ คุณช่วยย้ายไปที่คำตอบของคุณได้ไหม? ฉันจะลบความคิดเห็นของฉันเพื่อรักษาคำตอบที่สะอาด (ยอมรับแน่นอน) ขอบคุณอีกครั้ง. - person WoJ; 16.09.2015
comment
เยี่ยมมาก ดีใจที่ได้ผลสำหรับคุณ! ตอนแรกฉันไม่รู้ว่าคุณอยู่ในกล่อง Windows - person Velimir Mlaker; 16.09.2015
comment
กรุณาอย่าใช้ lock.acquire/release ใช้กับล็อค. ในรหัสของคุณ หากมีข้อยกเว้นเกิดขึ้นหลังจากการได้มา แสดงว่าคุณล้มเหลวในการสัมภาษณ์งาน - person user48956; 27.09.2017
comment
ในทำนองเดียวกัน ปิดและเข้าร่วมควรเป็น a ในที่สุด: หรือคุณควรใช้กับ Pool() เป็นพูล: - person user48956; 27.09.2017

คุณทำอะไรง่ายๆ แบบนี้ไม่ได้:

from multiprocessing import Process
from time import sleep

def f(n):
    print 'started job: '+str(n)
    sleep(3)
    print 'ended job: '+str(n)

if __name__ == '__main__':
    for i in range(0,100):
        p = Process(target=f, args=(i,))
        p.start()
        sleep(1)

ผลลัพธ์

started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5
person taesu    schedule 14.09.2015
comment
นี่คือวิธีแก้ปัญหาที่ฉันพูดถึงในส่วนสุดท้ายของคำถาม ฉันต้องการเข้าใจว่าเป็นไปได้หรือไม่ที่จะควบคุมวิธีที่ Pool กระบวนการเริ่มต้น - person WoJ; 15.09.2015
comment
นอกจากนั้นโค้ดด้านบนจะเริ่ม 100 กระบวนการพร้อมกัน ในขณะที่ฉันจำกัดไว้ที่ 3 กระบวนการและ 20 คนที่ใช้โดยกระบวนการแรกที่มีอยู่ สิ่งนี้สามารถแก้ไขได้ด้วย Queue - person WoJ; 16.09.2015

คุณลองกำหนดฟังก์ชันที่ให้ค่าของคุณช้าๆ ได้ไหม

def get_numbers_on_delay(numbers, delay):
    for i in numbers:
        yield i
        time.sleep(delay)

แล้ว:

results = pool.map(worker, get_numbers_on_delay(numbers, 5))

ฉันยังไม่ได้ทดสอบ ดังนั้นฉันจึงไม่แน่ใจ แต่ลองดูก่อน

person acushner    schedule 16.09.2015

ฉันไม่สามารถรับคำตอบการล็อคให้ทำงานได้ด้วยเหตุผลบางประการ ดังนั้นฉันจึงใช้วิธีนี้ ฉันรู้ว่าคำถามนี้เก่า แต่อาจมีคนอื่นประสบปัญหาเดียวกัน

โดยจะวางกระบวนการทั้งหมดที่คล้ายกับโซลูชันการล็อค แต่จะเข้าสู่โหมดสลีปก่อนทำงานตามหมายเลขชื่อกระบวนการ

from multiprocessing import current_process
from re import search
from time import sleep

def worker():
    process_number = search('\d+', current_process().name).group()
    time_between_workers = 5
    sleep(time_between_workers * int(process_number))
    #do your work here

เนื่องจากชื่อที่กำหนดให้กับกระบวนการดูเหมือนจะไม่ซ้ำกันและเพิ่มขึ้น สิ่งนี้จะดึงจำนวนของกระบวนการและเข้าสู่โหมดสลีปตามนั้น SpawnPoolWorker-1 นอน 1 * 5 วินาที SpawnPoolWorker-2 นอน 2 * 5 วินาที เป็นต้น

person raecer    schedule 04.05.2017