Заставьте Reactive Extensions Buffer ожидать завершения асинхронной операции

Я использую Reactive Extensions (Rx) для буферизации некоторых данных. Однако у меня возникла проблема в том, что мне нужно сделать что-то асинхронное с этими данными, но я не хочу, чтобы буфер пропускал следующую группу, пока асинхронная операция не будет завершена.

Я пытался структурировать код двумя способами (надуманный пример):

public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

or

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

К сожалению, ни один из этих методов не работает, так как буфер помещает следующую группу до завершения асинхронных операций. Цель состоит в том, чтобы каждая буферизованная группа выполнялась асинхронно и только после завершения этой операции переходила к следующей буферизованной группе.

Любая помощь приветствуется.


person MgSam    schedule 12.06.2013    source источник
comment
channel9.msdn.com/Shows/Going+Deep/   -  person spender    schedule 13.06.2013
comment
Ваш второй пример не имеет смысла, вы вообще не используете upload(). И Task.WhenAll() не будет работать с коллекцией File.   -  person svick    schedule 13.06.2013
comment
@svick Опечатка. Исправленный.   -  person MgSam    schedule 13.06.2013


Ответы (2)


Во-первых, я думаю, что ваше требование выполнять элементы из каждой группы параллельно, но последовательное выполнение каждой группы довольно необычно. Более распространенным требованием было бы выполнение элементов параллельно, но не более n из них одновременно. Таким образом, нет фиксированных групп, поэтому, если один элемент занимает слишком много времени, другие элементы не должны его ждать.

Чтобы сделать то, о чем вы просите, я думаю, что поток данных TPL больше подходит, чем Rx (хотя некоторый код Rx все же будет полезен). Поток данных TPL сосредоточен на «блоках», которые по умолчанию выполняют что-то последовательно, а это именно то, что вам нужно.

Ваш код может выглядеть так:

public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

Это оставляет большую часть тяжелой работы на ActionBlock (и часть на Rx). Блоки потока данных могут действовать как наблюдатели Rx (и наблюдаемые объекты), поэтому мы можем воспользоваться этим, чтобы продолжать использовать Buffer().

Мы хотим обработать всю группу сразу, поэтому мы используем Task.WhenAll() для создания Task, которое завершается, когда завершается вся группа. Блоки потока данных понимают функции, возвращающие Task, поэтому следующая группа не начнет выполнение до тех пор, пока не завершится Task, возвращенная предыдущей группой.

Конечным результатом является Completion Task, который завершится после завершения исходного наблюдаемого объекта и завершения всей обработки.

TPL Dataflow также имеет BatchBlock, который работает как Buffer(), и мы могли бы напрямую Post() каждый элемент из коллекции (без использования ToObservable() и AsObserver()), но я думаю, что использование Rx для этой части кода делает его проще.

EDIT: На самом деле вам вообще не нужен поток данных TPL. Достаточно использовать ToEnumerable(), как предложил James World:

public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}

Или еще проще без Rx с использованием Batch() из morelinq:

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}
person svick    schedule 13.06.2013
comment
Я не уверен, что согласен с тем, что то, что я пытаюсь сделать, противоречит характеру толчка Rx. Насколько я понимаю, метод Batch предназначен для того, чтобы не срабатывать до тех пор, пока не будет завершена обработка (скользящее окно, если хотите). Я просто хотел распространить это поведение на асинхронный контекст. Несмотря на это, изменение его обратно в перечисляемое, похоже, сработало. Спасибо. - person MgSam; 14.06.2013
comment
@MgSam Это было для меня или для Джеймса? Я ничего не говорил о «напорном характере». - person svick; 14.06.2013
comment
Я предположил, что вы согласны с этим рассуждением, когда сослались на его пост, а затем предложили ToEnumerable. Извиняюсь за неправильное толкование. - person MgSam; 14.06.2013

Чтобы убедиться, что я вас правильно понял, похоже, вы хотите убедиться, что вы продолжаете буферизацию элементов, представляя каждый буфер только после обработки предыдущего буфера.

Также нужно сделать обработку каждого буфера асинхронной.

Вероятно, полезно рассмотреть некоторые теоретические моменты, потому что я должен признаться, что немного запутался в подходе. Часто говорят, что IObservable является двойником IEnumerable, потому что он отражает последний с ключевым отличием в том, что данные передаются потребителю, а не потребитель извлекает их по своему выбору. .

Вы пытаетесь использовать буферизованный поток, например IEnumerable, вместо IObservable — вы, по сути, хотите извлекать буферы, а не нажимать их на вас — поэтому я должен задаться вопросом, правильно ли вы выбрали инструмент для работы? Вы пытаетесь задержать саму операцию буферизации во время обработки буфера? Как потребитель, передающий вам данные, это не совсем правильный подход.

Вы можете рассмотреть возможность применения вызова ToEnumerable() к операции с буфером, чтобы вы могли работать с буферами, когда они будут готовы. Однако это не предотвратит текущую буферизацию, когда вы имеете дело с текущим буфером.

Вы мало что можете сделать, чтобы предотвратить это — выполнение обработки буфера синхронно внутри операции Select(), примененной к буферу, гарантирует, что последующий вызов OnNext() не произойдет до завершения проекции Select(). Гарантия предоставляется бесплатно, поскольку операторы библиотеки Rx обеспечивают соблюдение грамматики Rx. Но это только гарантирует неперекрывающиеся вызовы OnNext() - нет ничего, что говорило бы о том, что данный оператор не может (и действительно не должен) продолжить подготовку следующего OnNext() к работе. Такова природа API, основанного на push-уведомлениях.

Очень неясно, почему вы думаете, что проекция должна быть асинхронной, если вы также хотите заблокировать буферы? Подумайте об этом - я подозреваю, что использование синхронного Select() в вашем наблюдателе может решить проблему, но из вашего вопроса это не совсем ясно.

Подобно синхронному Select(), это синхронный обработчик OnNext(), который легче обрабатывать, если ваша обработка элементов не имеет результатов, но это не то же самое, потому что (в зависимости от реализации Observable) вы блокируете только доставку вызовов OnNext() этому подписчику. а не все подписчики. Однако с одним подписчиком это эквивалентно, поэтому вы можете сделать что-то вроде:

void Main()
{
    var source = Observable.Range(1, 4);

    source.Buffer(2)
        .Subscribe(i =>
    {
        Console.WriteLine("Start Processing Buffer");
        Task.WhenAll(from n in i select DoUpload(n)).Wait();
        Console.WriteLine("Finished Processing Buffer");
    });
}

private Task DoUpload(int i)
{
    return Task.Factory.StartNew(
        () => {
            Thread.Sleep(1000);
            Console.WriteLine("Process File " + i);
        });
}

Что выводит (*без гарантии порядка Process File x внутри буфера):

Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer

Если вы предпочитаете использовать Select() и ваша проекция не дает результатов, вы можете сделать это следующим образом:

source.Buffer(2)
    .Select(i =>
{
    Console.WriteLine("Start Processing Buffer");
    Task.WhenAll(from n in i select DoUpload(n)).Wait();
    Console.WriteLine("Finished Processing Buffer");
    return Unit.Default;
}).Subscribe();

Примечание. Пример кода, написанного на LINQPad и включающего пакет Nuget Rx-Main. Этот код служит иллюстративным целям — не используйте Thread.Sleep() в рабочем коде!

person James World    schedule 13.06.2013