สำหรับ TPL Dataflow: ฉันจะจัดการกับเอาต์พุตทั้งหมดที่ผลิตโดย TransformBlock ในขณะที่บล็อกจนกว่าอินพุตทั้งหมดจะได้รับการประมวลผลได้อย่างไร

ฉันกำลังส่งชุดคำสั่ง select (แบบสอบถาม - หลายพันชุด) ไปยังฐานข้อมูลเดียวพร้อมกันและรับกลับหนึ่ง DataTable ต่อการสืบค้น (หมายเหตุ: โปรแกรมนี้มีความรู้เกี่ยวกับสคีมา DB ที่กำลังสแกนเฉพาะในเวลารันไทม์เท่านั้น ดังนั้นการใช้ DataTables) โปรแกรมทำงานบนเครื่องไคลเอนต์และเชื่อมต่อกับฐานข้อมูลบนเครื่องระยะไกล ใช้เวลานานในการเรียกใช้แบบสอบถามจำนวนมาก ดังนั้น สมมติว่าการดำเนินการแบบอะซิงก์หรือแบบขนานจะทำให้สิ่งต่าง ๆ เร็วขึ้น ฉันกำลังสำรวจ TPL Dataflow (TDF) ฉันต้องการใช้ไลบรารี TDF เพราะดูเหมือนว่าจะจัดการกับข้อกังวลทั้งหมดที่เกี่ยวข้องกับการเขียนโค้ดแบบมัลติเธรดที่อาจจำเป็นต้องทำด้วยมือ

รหัสที่แสดงขึ้นอยู่กับ http://blog.i3arnon.com/2016/05/23/tpl-dataflow/. มันน้อยที่สุดและเป็นเพียงเพื่อช่วยให้ฉันเข้าใจการทำงานพื้นฐานของ TDF โปรดทราบว่าฉันได้อ่านบล็อกมากมายและเขียนโค้ดซ้ำหลายครั้งเพื่อพยายามถอดรหัสถั่วนี้

ไม่น้อยไปกว่านั้น ด้วยการวนซ้ำปัจจุบันนี้ ฉันมีปัญหาหนึ่งข้อและคำถาม:

ปัญหา

โค้ดอยู่ในเมธอด button click (เมื่อใช้ UI ผู้ใช้จะเลือกเครื่อง อินสแตนซ์ sql และฐานข้อมูล จากนั้นจึงเริ่มต้นการสแกน) สองบรรทัดที่มีตัวดำเนินการ await ส่งคืนข้อผิดพลาด ณ เวลาสร้าง: The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task' ฉันไม่สามารถเปลี่ยนประเภทการส่งคืนวิธีการคลิกปุ่มได้ ฉันจำเป็นต้องแยกวิธี button click ออกจากรหัส async-await หรือไม่

คำถาม

แม้ว่าฉันจะพบบทความเขียนแบบ beau-coup ที่อธิบายพื้นฐานของ TDF แต่ฉันไม่พบตัวอย่างวิธีจัดการกับผลลัพธ์ที่การเรียกใช้ TransformBlock แต่ละครั้งสร้างขึ้น (เช่น DataTable) แม้ว่าฉันต้องการส่งข้อความค้นหา async แต่ฉันจำเป็นต้องบล็อกจนกว่าข้อความค้นหาทั้งหมดที่ส่งไปยัง TransformBlock จะเสร็จสมบูรณ์ ฉันจะได้เป็นเจ้าของซีรีส์ DataTables ที่สร้างโดย TransformBlock และบล็อกจนกว่าการสืบค้นทั้งหมดจะเสร็จสมบูรณ์ได้อย่างไร

หมายเหตุ: ฉันรับทราบว่าขณะนี้ฉันมีบล็อกเดียวเท่านั้น อย่างน้อยที่สุด ฉันจะเพิ่มบล็อกการยกเลิก และจำเป็นต้อง/ต้องการใช้ TPL ด้วย

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{

    UserInput userInput = new UserInput
    {
        MachineName = "gat-admin",
        InstanceName = "",
        DbName = "AdventureWorks2014",
    };

    DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);

    //CreateTableQueryList gets a list of all tables from the DB and returns a list of 
    // select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
    IList<String> tableQueryList = CreateTableQueryList(userInput);

    // Define a block that accepts a select statement and returns a DataTable of results
    // where each returned record is: schemaname + tablename + columnname + column datatype + field data
    // e.g., if the select query returns one record with 5 columns, then a datatable with 5 
    // records (one per field) will come back 

    var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
        async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 2,
        });

    // Add items to the block and start processing
    foreach (String tableQuery in tableQueryList)
    {
        await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
    }

    // Enable the Cancel button and disable the Start button.
    toolStripButtonStart.Enabled = false;
    toolStripButtonStop.Enabled = true;

    //shut down the block (no more inputs or outputs)
    transformBlock_SubmitTableQuery.Complete();

    //await the completion of the task that procduces the output DataTable
    await transformBlock_SubmitTableQuery.Completion;
}

public async Task<DataTable> _SubmitSelectStatement(string queryString )
{
    try
    {

        .
        .
        await Task.Run(() => sqlDataAdapter.Fill(dt));

        // process dt into the output DataTable I need

        return outputDt;
    }
    catch
    {
        throw;
    }

}

person William Charlton    schedule 20.03.2018    source แหล่งที่มา
comment
สมมติฐานที่ไม่ดี หากข้อความค้นหาช้า แก้ไข การเรียกใช้คำสั่งที่ช้ากว่าบนเซิร์ฟเวอร์เดียวกันกับ CPU เดียวกันและดิสก์เดียวกันบนเครือข่ายเดียวกันจะมีผล ช้า เท่านั้น การโหลดผลลัพธ์ลงใน DataTable จะเพิ่มความล่าช้ามากยิ่งขึ้น   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
ข้อความค้นหาที่โหลดทุกอย่างลงใน ไคลเอนต์ เพื่อการประมวลผลส่งผลให้เกิดความล่าช้าที่เลวร้ายยิ่งกว่านั้นมาก ไคลเอนต์ มีหน่วยความจำน้อยกว่า CPU น้อยกว่า IO ของดิสก์น้อยกว่าเซิร์ฟเวอร์ และ ไม่มี ดัชนีเพื่อเพิ่มความเร็ว การโหลดทุกอย่างลงในไคลเอนต์เพื่อการประมวลผลถือเป็นความคิดที่ไม่ดี   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
BTW หากคุณพยายามประมวลผลข้อมูล เช่น สร้างรายงานหรือกรอกสคีมาการรายงาน ให้สร้างฐานข้อมูลการรายงานหรือคลังข้อมูลที่เหมาะสม และกรอกข้อมูลโดยใช้เครื่องมือ ETL เช่น SSIS อัพเดตเฉพาะแถวที่มีการเปลี่ยนแปลง ประสิทธิภาพการสืบค้นจะมีขนาดที่ดีกว่าการประมวลผลบนไคลเอนต์มาก การทำงานเฉพาะกับการเปลี่ยนแปลงจะเป็นลำดับความสำคัญเร็วกว่าการดึงทุกอย่างเช่นกัน   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
@PanagiotisKanavos: คะแนนที่ดีและถูกต้องทั้งหมด อย่างไรก็ตาม มันก็เป็นสิ่งที่มันเป็น เป็นข้อกำหนดว่าการใช้โปรแกรมนี้ไม่เกี่ยวข้องกับการเปลี่ยนแปลงในเครื่อง (การผลิต) ที่กำลังสแกนฐานข้อมูล ฉันแค่ต้องทำให้การสืบค้นดำเนินการให้เร็วที่สุดเท่าที่จะเป็นไปได้ ไม่จำเป็นต้องเร็วจนเกินไป   -  person William Charlton    schedule 20.03.2018


คำตอบ (2)


วิธีที่ถูกต้องในการดึงข้อมูลเอาต์พุตของ TransformBlock คือดำเนินการวนซ้ำแบบซ้อนโดยใช้เมธอด OutputAvailableAsync และ TryReceive. มันยุ่งนิดหน่อย ดังนั้นคุณจึงสามารถซ่อนความซับซ้อนนี้จากโค้ดแอปพลิเคชันของคุณได้โดยการคัดลอกและวางวิธีการขยายด้านล่างในคลาสคงที่บางคลาสของโปรเจ็กต์ของคุณ:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

จากนั้นคุณสามารถใช้วิธี ToListAsync ดังนี้:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
    var transformBlock = new TransformBlock<string, DataTable>(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    {
        // Do something with the dataTable
    }
}

หากคุณอัปเกรดโปรเจ็กต์ของคุณเป็น C# 8 คุณยังมีตัวเลือกในการดึงข้อมูลเอาต์พุตในรูปแบบการสตรีม เช่น IAsyncEnumerable:

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> block,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            yield return item;
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
}

ด้วยวิธีนี้คุณจะสามารถไปถึง DataTable แต่ละรายการได้ทันทีหลังจากที่ปรุงเสร็จแล้ว โดยไม่ต้องรอการประมวลผลคำถามทั้งหมด หากต้องการใช้ IAsyncEnumerable คุณเพียงเลื่อน await ก่อน foreach:

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
    // Do something with the dataTable
}
person Theodor Zoulias    schedule 16.06.2020

ปรากฎว่าเพื่อให้เป็นไปตามข้อกำหนดของฉัน TPL Dataflow จึงต้องใช้กำลังมากเกินไปเล็กน้อย ฉันสามารถตอบสนองความต้องการของฉันได้โดยใช้ async/await และ Task.WhenAll ฉันใช้ Microsoft How-To วิธีการ: ขยายบทสรุปแบบอะซิงก์โดยใช้ Task.WhenAll (C#) เป็นแบบจำลอง

เกี่ยวกับ "ปัญหา" ของฉัน

“ปัญหา” ของฉันไม่ใช่ปัญหา ลายเซ็นวิธีการเหตุการณ์ (ในกรณีของฉัน วิธีการคลิกปุ่ม "เริ่ม" ที่เริ่มต้นการค้นหาของฉัน) สามารถแก้ไขได้ให้เป็น async ในโซลูชัน Microsoft How-To GetURLContentsAsync ดูลายเซ็นวิธีการ startButton_Click:

private async void startButton_Click(object sender, RoutedEventArgs e)  
{  
    .
    .
}  

เกี่ยวกับคำถามของฉัน

เมื่อใช้ Task.WhenAll ฉันสามารถรอให้การสืบค้นทั้งหมดเสร็จสิ้น จากนั้นจึงประมวลผลเอาต์พุตทั้งหมดเพื่อใช้กับ UI ของฉัน ในโซลูชัน Microsoft How-To GetURLContentsAsync โปรดดูเมธอด SumPageSizesAsync กล่าวคือ อาร์เรย์ของ int ชื่อ lengths คือผลรวมของเอาต์พุตทั้งหมด

private async Task SumPageSizesAsync()  
{  
    .
    .
    // Create a query.   
    IEnumerable<Task<int>> downloadTasksQuery = from url in urlList select ProcessURLAsync(url);  

    // Use ToArray to execute the query and start the download tasks.  
    Task<int>[] downloadTasks = downloadTasksQuery.ToArray();  

    // Await the completion of all the running tasks.  
    Task<int[]> whenAllTask = Task.WhenAll(downloadTasks);  

    int[] lengths = await whenAllTask;  
    .
    .
}    
person William Charlton    schedule 25.03.2018