Cara memecahkan time.sleep() dengan python concurrent.futures

Saya bermain-main dengan concurrent.futures.

Saat ini panggilan masa depan saya time.sleep(secs).

Tampaknya Future.cancel() hasilnya kurang dari yang saya kira.

Jika masa depan sudah dijalankan, maka time.sleep() tidak dibatalkan olehnya.

Hal yang sama juga terjadi pada parameter batas waktu untuk wait(). Itu tidak membatalkan time.sleep() saya.

Bagaimana cara membatalkan time.sleep() yang dieksekusi secara bersamaan.futures?

Untuk pengujian saya menggunakan ThreadPoolExecutor.


person guettli    schedule 19.07.2016    source sumber
comment
jawaban singkat - tidak mungkin, dan kemungkinan besar penggunaan sleep pada pekerja berarti masalah dengan desain, jawaban panjang - Anda selalu dapat menerapkan tidur khusus dengan kemungkinan untuk merusaknya, namun ini bukan pythonic atau benar. sebagai alternatif Anda dapat memeriksa penggunaan kunci.   -  person Reishin    schedule 01.08.2016


Jawaban (3)


Jika Anda mengirimkan fungsi ke ThreadPoolExecutor, pelaksana akan menjalankan fungsi tersebut di thread dan menyimpan nilai kembaliannya di objek Future. Karena jumlah thread bersamaan terbatas, Anda mempunyai pilihan untuk membatalkan eksekusi tertunda di masa depan, namun setelah kontrol dalam thread pekerja telah diteruskan ke callable, tidak ada cara untuk menghentikan eksekusi.

Pertimbangkan kode ini:

import concurrent.futures as f
import time

T = f.ThreadPoolExecutor(1) # Run at most one function concurrently
def block5():
    time.sleep(5)
    return 1
q = T.submit(block5)
m = T.submit(block5)

print q.cancel()  # Will fail, because q is already running
print m.cancel()  # Will work, because q is blocking the only thread, so m is still queued

Secara umum, kapan pun Anda menginginkan sesuatu yang dapat dibatalkan, Anda sendirilah yang bertanggung jawab untuk memastikan hal tersebut.

Namun ada beberapa opsi siap pakai yang tersedia. Misalnya, pertimbangkan untuk menggunakan asyncio, mereka juga memiliki contoh menggunakan sleep. Konsep ini menghindari masalah ini dengan, setiap kali operasi pemblokiran dipanggil, alih-alih mengembalikan kontrol ke loop kontrol yang berjalan dalam konteks terluar, bersama dengan catatan bahwa eksekusi harus dilanjutkan kapan pun hasilnya tersedia - atau, dalam kasus Anda, setelah n detik berlalu.

person Phillip    schedule 25.07.2016
comment
Oh asyiknya :-) Saya beralih dari multiprosesing ke concurrent.futures (karena alasan lain). Sekarang saya sedang berpikir untuk beralih dari concurrent.futures ke asyncio... :-). Meski begitu, Phillip, terima kasih atas jawaban Anda! - person guettli; 25.07.2016
comment
Sama-sama ???? Ngomong-ngomong, dengan multiprocessing, interupsi sleep bisa dilakukan, karena tentu saja Anda bisa kill proses lainnya. - person Phillip; 25.07.2016
comment
@ Saya pikir saya bisa menggunakan kill di concurrent.futures juga. Saya hanya perlu beralih dari ThreadPoolExecutor ke ProcessPoolExecutor. Atau apakah ini salah? - person guettli; 25.07.2016
comment
Secara teori, ya, tetapi (a), atribut _processes tidak didokumentasikan sehingga dapat berubah, dan (b), setelah Anda mendeteksi bahwa masa depan Anda sedang berjalan dan belum selesai, Anda akan berlomba antara masa depan selesai dan Anda menghentikannya -- jika kalah, maka Anda menghentikan tugas lain (yang tidak terkait) dan bukan tugas yang Anda inginkan. - person Phillip; 25.07.2016
comment
Linux AFAIK meningkatkan PID untuk setiap proses baru, bersepeda jika mencapai batas atas. Sangat kecil kemungkinannya hal ini akan terjadi. Tapi Anda benar: Ini adalah kondisi balapan. - person guettli; 25.07.2016
comment
Ini adalah kumpulan Proses, python tidak memunculkan proses baru untuk setiap tugas! - person Phillip; 25.07.2016
comment
Apakah multiprocessing dan concurrent.futures berbeda di area ini? AFAIK keduanya menggunakan kumpulan proses. - person guettli; 25.07.2016
comment
Dengan multiprosesing, Anda dapat menggunakan kelas Process secara langsung untuk memasukkan pengamanan Anda sendiri. Kemudian lagi, tentu saja Anda juga dapat membuat subkelas ProcessPoolExecutor dan menambahkan kemampuan penghentian aman. - person Phillip; 26.07.2016
comment
Saya tidak dapat menemukan dokumen tentang memasukkan perlindungan di dokumen resmi. Kasus1: Saya buta, Kasus2: dokumen resmi tidak membahas hal ini. Jika kasusnya 2: Apakah menurut Anda hal ini harus didokumentasikan? Saya tidak mempercayai postingan blog atau sumber pihak ketiga lainnya. Dalam jangka panjang, barang-barang tersebut sudah usang dan tidak dirawat dengan baik. - person guettli; 26.07.2016
comment
Kasus 2, dan menurut saya hal ini tidak perlu didokumentasikan karena tidak pernah merupakan praktik terbaik dan hanya berguna dalam kasus darurat, yaitu jika Anda memiliki banyak tugas jangka panjang (jika tidak, tidak ada perlu membatalkan apa pun, biarkan saja terus dan abaikan hasilnya) yang sangat jarang perlu Anda batalkan (jika tidak, killproses setiap kali akan berlebihan dan Anda benar-benar harus merancang tugas Anda agar dapat dibatalkan). - person Phillip; 26.07.2016
comment
@Phillip Saya memahami bahwa asyncio, pemrograman fungsional menjadi populer tetapi waktu istirahat. Tidur tidak ada hubungannya dengan asyncio. Saya bahkan akan mengatakan, jika dia menggunakan sleep di ThreadedPool, sepertinya dia memiliki masalah dengan desain aplikasi dan bahkan asyncio tidak akan mengamankannya. - person Reishin; 01.08.2016
comment
@Reishin Ingatlah bahwa contoh asyncio menggunakan asyncio.sleep, bukan time.sleep. time.sleep hanya dapat diinterupsi dengan mengirimkan sinyal ke proses tidur, terlepas dari antarmuka Python. Maksud saya adalah jika Anda ingin tidur dengan cara yang dapat diinterupsi, Anda tidak boleh menggunakan nanosleep(2) sama sekali, melainkan, misalnya, select(2) pada eventfd(2) menggunakan batas waktu. asyncio adalah salah satu kemungkinan pembungkus API ini (dan sejenisnya). - person Phillip; 01.08.2016
comment
@Phillip asyncio.sleep biasanya memberikan kesempatan kepada aliran lain untuk diproses sementara penelepon akan menunggu, dan tidak ada jaminan bahwa ini akan terjadi dalam 1 atau 2 detik (karena sifat asyncio). Implementasi lain seperti eventfd r bergantung pada platform dan berikut adalah pertanyaan sederhana - bagaimana cara menghentikan waktu.tidur, bukan bagaimana solusi untuk mewujudkannya. - person Reishin; 01.08.2016
comment
@Reishin Semua fungsi asyncio digunakan untuk memberikan kesempatan pada bagian lain dari program untuk diproses. Itulah inti dan alasan mengapa modul dapat berfungsi sebagai sarana bagi penulis untuk menulis algoritma interupsi. Jika Anda menggunakan asyncio dengan benar, asyncio.sleep memang menjamin bahwa Anda tidur selama jangka waktu tertentu (hingga resolusi loop peristiwa). Saya tidak setuju dengan sifat pertanyaannya. time.sleep digunakan oleh penulis sebagai contoh; yang dia tanyakan adalah bagaimana tugas (yang sedang berjalan) yang hasilnya dirangkum dalam Future dapat dibatalkan. - person Phillip; 02.08.2016

Saya tidak tahu banyak tentang concurrent.futures, tetapi Anda dapat menggunakan logika ini untuk membagi waktu. Gunakan loop alih-alih sleep.time() atau wait()

for i in range(sec):
    sleep(1)

interupsi atau break dapat digunakan untuk keluar dari loop.

person ketan khandagale    schedule 28.07.2016
comment
Ya, ini bisa berhasil. Rasanya seperti orang dari Finlandia yang hanya ingin membaca emailnya melalui koneksi dialup... hmmm Saya memerlukan event-loop.... Saya memerlukan penjadwal....dan yang terakhir adalah OS. - person guettli; 28.07.2016

Seperti yang tertulis di tautan, Anda dapat gunakan pernyataan with untuk memastikan thread segera dibersihkan, seperti contoh di bawah ini:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
person iraf    schedule 07.01.2021