ทำให้ Reactive Extensions Buffer รอให้การดำเนินการแบบอะซิงโครนัสเสร็จสิ้น

ฉันใช้ Reactive Extensions (Rx) เพื่อบัฟเฟอร์ข้อมูลบางส่วน ฉันมีปัญหาแม้ว่าฉันต้องทำอะไรบางอย่างแบบอะซิงโครนัสกับข้อมูลนี้ แต่ฉันไม่ต้องการให้บัฟเฟอร์ส่งผ่านกลุ่มถัดไปจนกว่าการดำเนินการแบบอะซิงโครนัสจะเสร็จสมบูรณ์

ฉันพยายามจัดโครงสร้างโค้ดสองวิธี (ตัวอย่างที่วางแผนไว้):

public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

or

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

น่าเสียดายที่วิธีการเหล่านี้ทั้งสองวิธีไม่ทำงานเนื่องจากบัฟเฟอร์ส่งกลุ่มถัดไปก่อนที่การดำเนินการอะซิงก์จะเสร็จสมบูรณ์ จุดประสงค์คือเพื่อให้แต่ละกลุ่มที่บัฟเฟอร์ดำเนินการแบบอะซิงโครนัส และเมื่อการดำเนินการนั้นเสร็จสมบูรณ์เท่านั้น ให้ดำเนินการต่อด้วยกลุ่มที่บัฟเฟอร์ถัดไป

ความช่วยเหลือใด ๆ ที่ชื่นชมอย่างมาก


person MgSam    schedule 12.06.2013    source แหล่งที่มา
comment
channel9.msdn.com/Shows/Going+Deep/   -  person spender    schedule 13.06.2013
comment
ตัวอย่างที่สองของคุณไม่สมเหตุสมผล คุณไม่ได้ใช้ upload() เลย และ Task.WhenAll() จะไม่ทำงานในคอลเลกชัน Files   -  person svick    schedule 13.06.2013
comment
@svick พิมพ์ผิด แก้ไขแล้ว   -  person MgSam    schedule 13.06.2013


คำตอบ (2)


อันดับแรก ฉันคิดว่าความต้องการของคุณในการดำเนินการรายการจากแต่ละกลุ่มแบบขนาน แต่แต่ละกลุ่มในซีรีส์นั้นค่อนข้างผิดปกติ ข้อกำหนดทั่วไปที่มากกว่าคือการดำเนินการรายการต่างๆ แบบขนาน แต่ส่วนใหญ่ n รายการในเวลาเดียวกัน ด้วยวิธีนี้ ไม่มีกลุ่มคงที่ ดังนั้นหากรายการเดียวใช้เวลานานเกินไป รายการอื่นๆ ก็ไม่ต้องรอ

ในการทำสิ่งที่คุณต้องการ ฉันคิดว่า TPL Dataflow เหมาะสมกว่า Rx (แม้ว่าโค้ด Rx บางส่วนจะยังคงมีประโยชน์อยู่ก็ตาม) TPL Dataflow มุ่งเน้นไปที่ “บล็อก” ที่ดำเนินการสิ่งต่าง ๆ ตามค่าเริ่มต้นเป็นซีรีส์ ซึ่งเป็นสิ่งที่คุณต้องการอย่างแท้จริง

รหัสของคุณอาจมีลักษณะดังนี้:

public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

นี่ทำให้การยกของหนักส่วนใหญ่บน ActionBlock (และบางส่วนบน Rx) บล็อกกระแสข้อมูลสามารถทำหน้าที่เป็นผู้สังเกตการณ์ Rx (และสิ่งที่สังเกตได้) ดังนั้นเราจึงสามารถใช้ประโยชน์จากสิ่งนั้นเพื่อใช้ Buffer() ต่อไป

เราต้องการจัดการทั้งกลุ่มในคราวเดียว ดังนั้นเราจึงใช้ Task.WhenAll() เพื่อสร้าง Task ซึ่งจะเสร็จสมบูรณ์เมื่อทั้งกลุ่มเสร็จสมบูรณ์ บล็อก Dataflow เข้าใจฟังก์ชัน Task ที่ส่งคืน ดังนั้นกลุ่มถัดไปจะไม่เริ่มทำงานจนกว่า Task ที่กลุ่มก่อนหน้านี้ส่งคืนจะเสร็จสมบูรณ์

ผลลัพธ์สุดท้ายคือ Completion Task ซึ่งจะเสร็จสมบูรณ์หลังจากแหล่งที่มาที่สังเกตได้เสร็จสิ้นและการประมวลผลทั้งหมดเสร็จสิ้น

TPL Dataflow ยังมี BatchBlock ซึ่งทำงานเหมือนกับ Buffer() และเราสามารถ Post() แต่ละรายการจากคอลเลกชันได้โดยตรง (โดยไม่ต้องใช้ ToObservable() และ AsObserver()) แต่ฉันคิดว่าการใช้ Rx สำหรับโค้ดส่วนนี้จะทำให้ง่ายขึ้น

แก้ไข: จริงๆ แล้วคุณไม่จำเป็นต้องมี TPL Dataflow ที่นี่เลย การใช้ ToEnumerable() ตามที่ James World แนะนำก็เพียงพอแล้ว:

public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}

หรือง่ายกว่านั้นโดยไม่ต้องใช้ Rx โดยใช้ Batch() จาก morelinq:

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}
person svick    schedule 13.06.2013
comment
ฉันไม่แน่ใจว่าฉันยอมรับว่าสิ่งที่ฉันพยายามทำนั้นตรงกันข้ามกับลักษณะการผลักดันของ Rx ความเข้าใจของฉันคือ Batch วิธีการได้รับการออกแบบไม่ให้เริ่มทำงานจนกว่าจะประมวลผลเสร็จ (หน้าต่างบานเลื่อนถ้าคุณต้องการ) ฉันแค่ต้องการขยายพฤติกรรมนั้นไปสู่บริบทแบบอะซิงโครนัส โดยไม่คำนึงว่าการเปลี่ยนกลับเป็นจำนวนนับดูเหมือนว่าจะได้ผล ขอบคุณ. - person MgSam; 14.06.2013
comment
@MgSam นั่นหมายถึงฉันหรือเจมส์หรือเปล่า? ฉันไม่ได้พูดอะไรเกี่ยวกับ "ธรรมชาติที่ผลักดัน" - person svick; 14.06.2013
comment
ฉันคิดว่าคุณเห็นด้วยกับเหตุผลนั้นเมื่อคุณอ้างอิงโพสต์ของเขาแล้วแนะนำ ToEnumerable ขออภัยสำหรับการตีความของฉันผิด - person MgSam; 14.06.2013

เพื่อให้แน่ใจว่าฉันเข้าใจคุณถูกต้อง ดูเหมือนว่าคุณต้องการให้แน่ใจว่าคุณดำเนินการบัฟเฟอร์รายการต่างๆ ในขณะที่แสดงเฉพาะแต่ละบัฟเฟอร์เมื่อบัฟเฟอร์ก่อนหน้าได้รับการประมวลผลเท่านั้น

คุณต้องทำการประมวลผลบัฟเฟอร์แต่ละตัวแบบอะซิงโครนัสด้วย

การพิจารณาประเด็นทางทฤษฎีบางประเด็นอาจเป็นประโยชน์ เพราะฉันต้องยอมรับว่าฉันสับสนเล็กน้อยเกี่ยวกับแนวทางนี้ IObservable มักถูกกล่าวว่าเป็นสองเท่าของ IEnumerable เพราะมันสะท้อนสิ่งหลังโดยมีความแตกต่างที่สำคัญคือข้อมูลถูก ส่ง ไปยังผู้บริโภคมากกว่าที่ผู้บริโภคจะ ดึง ตามที่เลือก .

คุณกำลังพยายามใช้สตรีมแบบบัฟเฟอร์เช่น IEnumerable แทนที่จะเป็น IObservable โดยพื้นฐานแล้วคุณต้องการดึงบัฟเฟอร์แทนที่จะปล่อยให้พวกมันผลักคุณ - ดังนั้นฉันต้องสงสัยว่าคุณได้เลือกเครื่องมือที่เหมาะสมสำหรับงานหรือไม่ คุณกำลังพยายามระงับการดำเนินการบัฟเฟอร์ ตัวมันเอง ในขณะที่บัฟเฟอร์ถูกประมวลผลหรือไม่ เนื่องจากผู้บริโภคให้ข้อมูลกับคุณ นี่ไม่ใช่แนวทางที่ถูกต้องจริงๆ

คุณสามารถลองใช้การเรียก ToEnumerable() กับการดำเนินการบัฟเฟอร์ เพื่อที่คุณจะได้จัดการบัฟเฟอร์ให้เราได้เมื่อพร้อม นั่นจะไม่ป้องกันการบัฟเฟอร์ที่เกิดขึ้นอย่างต่อเนื่องในขณะที่คุณจัดการกับบัฟเฟอร์ปัจจุบัน

คุณไม่สามารถป้องกันสิ่งนี้ได้มากนัก - การประมวลผลบัฟเฟอร์ ซิงโครนัส ภายในการดำเนินการ Select() ที่ใช้กับบัฟเฟอร์จะรับประกันว่าจะไม่มีการเรียก OnNext() ตามมาเกิดขึ้นจนกว่าการฉายภาพ Select() จะเสร็จสิ้น การรับประกันนี้ให้ฟรี เนื่องจากผู้ดำเนินการไลบรารี Rx บังคับใช้ไวยากรณ์ของ Rx แต่รับประกันเฉพาะการเรียกใช้ OnNext() ที่ไม่ทับซ้อนกัน - ไม่มีอะไรจะบอกว่าโอเปอเรเตอร์ที่ระบุไม่สามารถ (และไม่ควร) ดำเนินการต่อเพื่อเตรียม OnNext() ถัดไปให้พร้อมใช้งาน นั่นคือธรรมชาติของ API แบบพุช

ยังไม่ชัดเจนว่าทำไมคุณถึงคิดว่าคุณต้องให้การฉายภาพเป็นแบบอะซิงโครนัสหากคุณต้องการบล็อกบัฟเฟอร์ด้วย ลองคิดดูสิ - ฉันสงสัยว่าการใช้ซิงโครนัส Select() ในตัวผู้สังเกตการณ์ของคุณอาจช่วยแก้ปัญหาได้ แต่คำถามของคุณยังไม่ชัดเจนนัก

คล้ายกับซิงโครนัส Select() คือตัวจัดการ OnNext() แบบซิงโครนัสซึ่งจัดการได้ง่ายกว่าหากการประมวลผลรายการของคุณไม่มีผลลัพธ์ - แต่ก็ไม่เหมือนกันเพราะ (ขึ้นอยู่กับการใช้งาน Observable) คุณบล็อกการส่ง OnNext() โทรไปยังสมาชิกนั้นเท่านั้น มากกว่าสมาชิกทั้งหมด อย่างไรก็ตาม การมีสมาชิกเพียงรายเดียวก็เทียบเท่ากัน ดังนั้นคุณจึงสามารถดำเนินการบางอย่าง เช่น:

void Main()
{
    var source = Observable.Range(1, 4);

    source.Buffer(2)
        .Subscribe(i =>
    {
        Console.WriteLine("Start Processing Buffer");
        Task.WhenAll(from n in i select DoUpload(n)).Wait();
        Console.WriteLine("Finished Processing Buffer");
    });
}

private Task DoUpload(int i)
{
    return Task.Factory.StartNew(
        () => {
            Thread.Sleep(1000);
            Console.WriteLine("Process File " + i);
        });
}

เอาต์พุตใด (*ไม่มีการรับประกันตามลำดับของไฟล์กระบวนการ x ภายใน บัฟเฟอร์):

Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer

หากคุณต้องการใช้ Select() และการฉายภาพของคุณไม่แสดงผลลัพธ์ คุณสามารถทำได้ดังนี้:

source.Buffer(2)
    .Select(i =>
{
    Console.WriteLine("Start Processing Buffer");
    Task.WhenAll(from n in i select DoUpload(n)).Wait();
    Console.WriteLine("Finished Processing Buffer");
    return Unit.Default;
}).Subscribe();

หมายเหตุ: โค้ดตัวอย่างที่เขียนใน LINQPad และรวมถึงแพ็คเกจ Nuget Rx-Main รหัสนี้ใช้เพื่อเป็นตัวอย่าง - ห้าม Thread.Sleep() ในรหัสที่ใช้งานจริง!

person James World    schedule 13.06.2013