Чтобы убедиться, что я вас правильно понял, похоже, вы хотите убедиться, что вы продолжаете буферизацию элементов, представляя каждый буфер только после обработки предыдущего буфера.
Также нужно сделать обработку каждого буфера асинхронной.
Вероятно, полезно рассмотреть некоторые теоретические моменты, потому что я должен признаться, что немного запутался в подходе. Часто говорят, что IObservable является двойником IEnumerable, потому что он отражает последний с ключевым отличием в том, что данные передаются потребителю, а не потребитель извлекает их по своему выбору. .
Вы пытаетесь использовать буферизованный поток, например IEnumerable, вместо IObservable — вы, по сути, хотите извлекать буферы, а не нажимать их на вас — поэтому я должен задаться вопросом, правильно ли вы выбрали инструмент для работы? Вы пытаетесь задержать саму операцию буферизации во время обработки буфера? Как потребитель, передающий вам данные, это не совсем правильный подход.
Вы можете рассмотреть возможность применения вызова ToEnumerable()
к операции с буфером, чтобы вы могли работать с буферами, когда они будут готовы. Однако это не предотвратит текущую буферизацию, когда вы имеете дело с текущим буфером.
Вы мало что можете сделать, чтобы предотвратить это — выполнение обработки буфера синхронно внутри операции Select()
, примененной к буферу, гарантирует, что последующий вызов OnNext()
не произойдет до завершения проекции Select()
. Гарантия предоставляется бесплатно, поскольку операторы библиотеки Rx обеспечивают соблюдение грамматики Rx. Но это только гарантирует неперекрывающиеся вызовы OnNext()
- нет ничего, что говорило бы о том, что данный оператор не может (и действительно не должен) продолжить подготовку следующего OnNext()
к работе. Такова природа API, основанного на push-уведомлениях.
Очень неясно, почему вы думаете, что проекция должна быть асинхронной, если вы также хотите заблокировать буферы? Подумайте об этом - я подозреваю, что использование синхронного Select()
в вашем наблюдателе может решить проблему, но из вашего вопроса это не совсем ясно.
Подобно синхронному Select()
, это синхронный обработчик OnNext()
, который легче обрабатывать, если ваша обработка элементов не имеет результатов, но это не то же самое, потому что (в зависимости от реализации Observable) вы блокируете только доставку вызовов OnNext()
этому подписчику. а не все подписчики. Однако с одним подписчиком это эквивалентно, поэтому вы можете сделать что-то вроде:
void Main()
{
var source = Observable.Range(1, 4);
source.Buffer(2)
.Subscribe(i =>
{
Console.WriteLine("Start Processing Buffer");
Task.WhenAll(from n in i select DoUpload(n)).Wait();
Console.WriteLine("Finished Processing Buffer");
});
}
private Task DoUpload(int i)
{
return Task.Factory.StartNew(
() => {
Thread.Sleep(1000);
Console.WriteLine("Process File " + i);
});
}
Что выводит (*без гарантии порядка Process File x внутри буфера):
Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer
Если вы предпочитаете использовать Select()
и ваша проекция не дает результатов, вы можете сделать это следующим образом:
source.Buffer(2)
.Select(i =>
{
Console.WriteLine("Start Processing Buffer");
Task.WhenAll(from n in i select DoUpload(n)).Wait();
Console.WriteLine("Finished Processing Buffer");
return Unit.Default;
}).Subscribe();
Примечание. Пример кода, написанного на LINQPad и включающего пакет Nuget Rx-Main. Этот код служит иллюстративным целям — не используйте Thread.Sleep()
в рабочем коде!
person
James World
schedule
13.06.2013
upload()
. ИTask.WhenAll()
не будет работать с коллекциейFile
. - person svick   schedule 13.06.2013