เพื่อให้แน่ใจว่าฉันเข้าใจคุณถูกต้อง ดูเหมือนว่าคุณต้องการให้แน่ใจว่าคุณดำเนินการบัฟเฟอร์รายการต่างๆ ในขณะที่แสดงเฉพาะแต่ละบัฟเฟอร์เมื่อบัฟเฟอร์ก่อนหน้าได้รับการประมวลผลเท่านั้น
คุณต้องทำการประมวลผลบัฟเฟอร์แต่ละตัวแบบอะซิงโครนัสด้วย
การพิจารณาประเด็นทางทฤษฎีบางประเด็นอาจเป็นประโยชน์ เพราะฉันต้องยอมรับว่าฉันสับสนเล็กน้อยเกี่ยวกับแนวทางนี้ IObservable มักถูกกล่าวว่าเป็นสองเท่าของ IEnumerable เพราะมันสะท้อนสิ่งหลังโดยมีความแตกต่างที่สำคัญคือข้อมูลถูก ส่ง ไปยังผู้บริโภคมากกว่าที่ผู้บริโภคจะ ดึง ตามที่เลือก .
คุณกำลังพยายามใช้สตรีมแบบบัฟเฟอร์เช่น IEnumerable แทนที่จะเป็น IObservable โดยพื้นฐานแล้วคุณต้องการดึงบัฟเฟอร์แทนที่จะปล่อยให้พวกมันผลักคุณ - ดังนั้นฉันต้องสงสัยว่าคุณได้เลือกเครื่องมือที่เหมาะสมสำหรับงานหรือไม่ คุณกำลังพยายามระงับการดำเนินการบัฟเฟอร์ ตัวมันเอง ในขณะที่บัฟเฟอร์ถูกประมวลผลหรือไม่ เนื่องจากผู้บริโภคให้ข้อมูลกับคุณ นี่ไม่ใช่แนวทางที่ถูกต้องจริงๆ
คุณสามารถลองใช้การเรียก ToEnumerable()
กับการดำเนินการบัฟเฟอร์ เพื่อที่คุณจะได้จัดการบัฟเฟอร์ให้เราได้เมื่อพร้อม นั่นจะไม่ป้องกันการบัฟเฟอร์ที่เกิดขึ้นอย่างต่อเนื่องในขณะที่คุณจัดการกับบัฟเฟอร์ปัจจุบัน
คุณไม่สามารถป้องกันสิ่งนี้ได้มากนัก - การประมวลผลบัฟเฟอร์ ซิงโครนัส ภายในการดำเนินการ Select()
ที่ใช้กับบัฟเฟอร์จะรับประกันว่าจะไม่มีการเรียก OnNext()
ตามมาเกิดขึ้นจนกว่าการฉายภาพ Select()
จะเสร็จสิ้น การรับประกันนี้ให้ฟรี เนื่องจากผู้ดำเนินการไลบรารี Rx บังคับใช้ไวยากรณ์ของ Rx แต่รับประกันเฉพาะการเรียกใช้ OnNext()
ที่ไม่ทับซ้อนกัน - ไม่มีอะไรจะบอกว่าโอเปอเรเตอร์ที่ระบุไม่สามารถ (และไม่ควร) ดำเนินการต่อเพื่อเตรียม OnNext()
ถัดไปให้พร้อมใช้งาน นั่นคือธรรมชาติของ API แบบพุช
ยังไม่ชัดเจนว่าทำไมคุณถึงคิดว่าคุณต้องให้การฉายภาพเป็นแบบอะซิงโครนัสหากคุณต้องการบล็อกบัฟเฟอร์ด้วย ลองคิดดูสิ - ฉันสงสัยว่าการใช้ซิงโครนัส Select()
ในตัวผู้สังเกตการณ์ของคุณอาจช่วยแก้ปัญหาได้ แต่คำถามของคุณยังไม่ชัดเจนนัก
คล้ายกับซิงโครนัส Select()
คือตัวจัดการ OnNext()
แบบซิงโครนัสซึ่งจัดการได้ง่ายกว่าหากการประมวลผลรายการของคุณไม่มีผลลัพธ์ - แต่ก็ไม่เหมือนกันเพราะ (ขึ้นอยู่กับการใช้งาน Observable) คุณบล็อกการส่ง OnNext()
โทรไปยังสมาชิกนั้นเท่านั้น มากกว่าสมาชิกทั้งหมด อย่างไรก็ตาม การมีสมาชิกเพียงรายเดียวก็เทียบเท่ากัน ดังนั้นคุณจึงสามารถดำเนินการบางอย่าง เช่น:
void Main()
{
var source = Observable.Range(1, 4);
source.Buffer(2)
.Subscribe(i =>
{
Console.WriteLine("Start Processing Buffer");
Task.WhenAll(from n in i select DoUpload(n)).Wait();
Console.WriteLine("Finished Processing Buffer");
});
}
private Task DoUpload(int i)
{
return Task.Factory.StartNew(
() => {
Thread.Sleep(1000);
Console.WriteLine("Process File " + i);
});
}
เอาต์พุตใด (*ไม่มีการรับประกันตามลำดับของไฟล์กระบวนการ x ภายใน บัฟเฟอร์):
Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer
หากคุณต้องการใช้ Select()
และการฉายภาพของคุณไม่แสดงผลลัพธ์ คุณสามารถทำได้ดังนี้:
source.Buffer(2)
.Select(i =>
{
Console.WriteLine("Start Processing Buffer");
Task.WhenAll(from n in i select DoUpload(n)).Wait();
Console.WriteLine("Finished Processing Buffer");
return Unit.Default;
}).Subscribe();
หมายเหตุ: โค้ดตัวอย่างที่เขียนใน LINQPad และรวมถึงแพ็คเกจ Nuget Rx-Main รหัสนี้ใช้เพื่อเป็นตัวอย่าง - ห้าม Thread.Sleep()
ในรหัสที่ใช้งานจริง!
person
James World
schedule
13.06.2013
upload()
เลย และTask.WhenAll()
จะไม่ทำงานในคอลเลกชันFile
s - person svick   schedule 13.06.2013