Jadikan Buffer Ekstensi Reaktif menunggu hingga operasi asinkron selesai

Saya menggunakan Ekstensi Reaktif (Rx) untuk buffer beberapa data. Saya mengalami masalah karena saya perlu melakukan sesuatu yang asinkron dengan data ini, namun saya tidak ingin buffer meneruskan grup berikutnya hingga operasi asinkron selesai.

Saya sudah mencoba menyusun kode dengan dua cara (contoh yang dibuat-buat):

public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

or

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

Sayangnya, tidak satu pun dari metode ini yang berhasil karena buffer mendorong grup berikutnya sebelum operasi asinkron selesai. Tujuannya adalah agar setiap grup yang di-buffer dieksekusi secara asinkron dan hanya ketika operasi tersebut selesai, lanjutkan dengan grup yang di-buffer berikutnya.

Bantuan apa pun sangat dihargai.


person MgSam    schedule 12.06.2013    source sumber
comment
saluran9.msdn.com/Shows/Going+Deep/   -  person spender    schedule 13.06.2013
comment
Contoh kedua Anda tidak masuk akal, Anda tidak menggunakan upload() sama sekali. Dan Task.WhenAll() tidak akan berfungsi pada koleksi Files.   -  person svick    schedule 13.06.2013
comment
@svick salah ketik. Dikoreksi.   -  person MgSam    schedule 13.06.2013


Jawaban (2)


Pertama, menurut saya persyaratan Anda untuk mengeksekusi item dari setiap grup secara paralel, tetapi setiap grup secara seri sangat tidak biasa. Persyaratan yang lebih umum adalah mengeksekusi item secara paralel, tetapi paling banyak n item pada saat yang bersamaan. Dengan cara ini, tidak ada grup yang tetap, jadi jika satu item memakan waktu terlalu lama, item lainnya tidak perlu menunggu.

Untuk melakukan apa yang Anda minta, menurut saya TPL Dataflow lebih cocok daripada Rx (walaupun beberapa kode Rx masih berguna). TPL Dataflow berpusat pada "blok" yang mengeksekusi sesuatu, secara default secara seri, dan itulah yang Anda butuhkan.

Kode Anda akan terlihat seperti ini:

public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

Hal ini menyisakan sebagian besar beban berat di ActionBlock (dan beberapa di Rx). Blok aliran data dapat bertindak sebagai pengamat Rx (dan dapat diamati), sehingga kita dapat memanfaatkannya untuk tetap menggunakan Buffer().

Kita ingin menangani seluruh grup sekaligus, jadi kita gunakan Task.WhenAll() untuk membuat Task yang selesai ketika seluruh grup selesai. Blok aliran data memahami fungsi pengembalian Task, sehingga grup berikutnya tidak akan mulai mengeksekusi hingga Task yang dikembalikan oleh grup sebelumnya selesai.

Hasil akhirnya adalah Completion Task, yang akan selesai setelah sumber observasi selesai dan semua pemrosesan selesai.

TPL Dataflow juga memiliki BatchBlock, yang berfungsi seperti Buffer() dan kita dapat langsung Post() setiap item dari koleksi (tanpa menggunakan ToObservable() dan AsObserver()), tetapi menurut saya menggunakan Rx untuk bagian kode ini membuatnya lebih sederhana.

EDIT: Sebenarnya Anda tidak memerlukan TPL Dataflow sama sekali di sini. Menggunakan ToEnumerable() seperti yang disarankan James World sudah cukup:

public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}

Atau bahkan lebih sederhana tanpa Rx menggunakan Batch() dari morelinq:

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}
person svick    schedule 13.06.2013
comment
Saya tidak yakin saya setuju bahwa apa yang saya coba lakukan bertentangan dengan sifat push Rx. Pemahaman saya adalah bahwa metode Batch dirancang untuk tidak diaktifkan sampai pemrosesan selesai (jendela geser, jika Anda mau). Saya hanya ingin memperluas perilaku itu ke konteks asinkron. Apapun itu, mengubahnya kembali menjadi enumerable sepertinya berhasil. Terima kasih. - person MgSam; 14.06.2013
comment
@MgSam Apakah itu dimaksudkan untuk saya atau James? Saya tidak mengatakan apa pun tentang “sifat mendorong”. - person svick; 14.06.2013
comment
Saya berasumsi Anda setuju dengan alasan tersebut ketika Anda mereferensikan postingannya dan kemudian menyarankan ToEnumerable. Maaf atas salah tafsir saya. - person MgSam; 14.06.2013

Untuk memastikan saya memahami Anda dengan benar, sepertinya Anda ingin memastikan Anda melanjutkan item buffering sambil hanya menampilkan setiap buffer ketika buffer sebelumnya telah diproses.

Anda juga perlu membuat pemrosesan setiap buffer tidak sinkron.

Mungkin ada gunanya mempertimbangkan beberapa poin teoretis, karena harus saya akui bahwa saya agak bingung dengan pendekatannya. IObservable sering dikatakan sebagai ganda dari IEnumerable karena mencerminkan yang terakhir dengan perbedaan utama adalah bahwa data dikirim ke konsumen daripada konsumen menarik sesuai pilihannya .

Anda mencoba menggunakan aliran buffered seperti IEnumerable dan bukan IObservable - Anda pada dasarnya ingin menarik buffer daripada mendorongnya ke Anda - jadi saya harus bertanya-tanya apakah Anda sudah memilih alat yang tepat untuk pekerjaan itu? Apakah Anda mencoba untuk menghentikan operasi buffering itu sendiri saat buffer sedang diproses? Sebagai konsumen yang memberikan data kepada Anda, ini bukanlah pendekatan yang tepat.

Anda dapat mempertimbangkan untuk menerapkan panggilan ToEnumerable() ke operasi buffer, sehingga Anda dapat menangani buffer tersebut ketika sudah siap. Itu tidak akan mencegah terjadinya buffering saat Anda menangani buffer saat ini.

Tidak banyak yang dapat Anda lakukan untuk mencegah hal ini - melakukan pemrosesan buffer secara sinkron di dalam operasi Select() yang diterapkan pada buffer akan memberikan jaminan bahwa tidak ada panggilan OnNext() berikutnya yang akan terjadi hingga proyeksi Select() selesai. Jaminan ini diberikan secara gratis karena operator perpustakaan Rx menerapkan tata bahasa Rx. Tapi itu hanya menjamin pemanggilan OnNext() yang tidak tumpang tindih - tidak ada yang mengatakan bahwa operator tertentu tidak bisa (dan memang tidak seharusnya) terus menyiapkan OnNext() berikutnya. Itulah sifat dari API berbasis push.

Sangat tidak jelas mengapa Anda merasa perlu proyeksi asinkron jika Anda juga ingin memblokir Buffer? Coba pikirkan - Saya menduga menggunakan Select() sinkron di pengamat Anda mungkin menyelesaikan masalah tetapi tidak sepenuhnya jelas dari pertanyaan Anda.

Mirip dengan Select() sinkron adalah OnNext() handler sinkron yang lebih mudah ditangani jika pemrosesan item Anda tidak membuahkan hasil - tetapi ini tidak sama karena (bergantung pada implementasi Observable) Anda hanya memblokir pengiriman OnNext() panggilan ke Pelanggan itu daripada semua Pelanggan. Namun, hanya dengan satu Pelanggan, hal ini setara sehingga Anda dapat melakukan sesuatu seperti:

void Main()
{
    var source = Observable.Range(1, 4);

    source.Buffer(2)
        .Subscribe(i =>
    {
        Console.WriteLine("Start Processing Buffer");
        Task.WhenAll(from n in i select DoUpload(n)).Wait();
        Console.WriteLine("Finished Processing Buffer");
    });
}

private Task DoUpload(int i)
{
    return Task.Factory.StartNew(
        () => {
            Thread.Sleep(1000);
            Console.WriteLine("Process File " + i);
        });
}

Output mana (*tanpa jaminan pada urutan File Proses x dalam Buffer):

Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer

Jika Anda lebih suka menggunakan Select() dan proyeksi Anda tidak membuahkan hasil, Anda dapat melakukannya seperti ini:

source.Buffer(2)
    .Select(i =>
{
    Console.WriteLine("Start Processing Buffer");
    Task.WhenAll(from n in i select DoUpload(n)).Wait();
    Console.WriteLine("Finished Processing Buffer");
    return Unit.Default;
}).Subscribe();

NB: Contoh kode yang ditulis di LINQPad dan termasuk paket Nuget Rx-Main. Kode ini hanya sebagai ilustrasi - jangan Thread.Sleep() dalam kode produksi!

person James World    schedule 13.06.2013