Pertama, menurut saya persyaratan Anda untuk mengeksekusi item dari setiap grup secara paralel, tetapi setiap grup secara seri sangat tidak biasa. Persyaratan yang lebih umum adalah mengeksekusi item secara paralel, tetapi paling banyak n item pada saat yang bersamaan. Dengan cara ini, tidak ada grup yang tetap, jadi jika satu item memakan waktu terlalu lama, item lainnya tidak perlu menunggu.
Untuk melakukan apa yang Anda minta, menurut saya TPL Dataflow lebih cocok daripada Rx (walaupun beberapa kode Rx masih berguna). TPL Dataflow berpusat pada "blok" yang mengeksekusi sesuatu, secara default secara seri, dan itulah yang Anda butuhkan.
Kode Anda akan terlihat seperti ini:
public static class Extensions
{
public static Task ExecuteInGroupsAsync<T>(
this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
var block = new ActionBlock<IEnumerable<T>>(
g => Task.WhenAll(g.Select(func)));
source.ToObservable()
.Buffer(groupSize)
.Subscribe(block.AsObserver());
return block.Completion;
}
}
public Task ProcessFiles(IEnumerable<File> files)
{
return files.ExecuteInGroupsAsync(Upload, 10);
}
Hal ini menyisakan sebagian besar beban berat di ActionBlock
(dan beberapa di Rx). Blok aliran data dapat bertindak sebagai pengamat Rx (dan dapat diamati), sehingga kita dapat memanfaatkannya untuk tetap menggunakan Buffer()
.
Kita ingin menangani seluruh grup sekaligus, jadi kita gunakan Task.WhenAll()
untuk membuat Task
yang selesai ketika seluruh grup selesai. Blok aliran data memahami fungsi pengembalian Task
, sehingga grup berikutnya tidak akan mulai mengeksekusi hingga Task
yang dikembalikan oleh grup sebelumnya selesai.
Hasil akhirnya adalah Completion
Task
, yang akan selesai setelah sumber observasi selesai dan semua pemrosesan selesai.
TPL Dataflow juga memiliki BatchBlock
, yang berfungsi seperti Buffer()
dan kita dapat langsung Post()
setiap item dari koleksi (tanpa menggunakan ToObservable()
dan AsObserver()
), tetapi menurut saya menggunakan Rx untuk bagian kode ini membuatnya lebih sederhana.
EDIT: Sebenarnya Anda tidak memerlukan TPL Dataflow sama sekali di sini. Menggunakan ToEnumerable()
seperti yang disarankan James World sudah cukup:
public static async Task ExecuteInGroupsAsync<T>(
this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
foreach (var g in groups)
{
await Task.WhenAll(g.Select(func));
}
}
Atau bahkan lebih sederhana tanpa Rx menggunakan Batch()
dari morelinq:
public static async Task ExecuteInGroupsAsync<T>(
this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
var groups = source.Batch(groupSize);
foreach (var group in groups)
{
await Task.WhenAll(group.Select(func));
}
}
person
svick
schedule
13.06.2013
upload()
sama sekali. DanTask.WhenAll()
tidak akan berfungsi pada koleksiFile
s. - person svick   schedule 13.06.2013