Untuk Aliran Data TPL: Bagaimana cara mendapatkan semua keluaran yang dihasilkan oleh TransformBlock sambil memblokir hingga semua masukan telah diproses?

Saya mengirimkan serangkaian pernyataan select (kueri - ribuan di antaranya) ke satu database secara sinkron dan mendapatkan kembali satu DataTable per kueri (Catatan: Program ini sedemikian rupa sehingga memiliki pengetahuan tentang skema DB yang dipindai hanya pada saat dijalankan. , maka penggunaan DataTables). Program ini berjalan pada mesin klien dan terhubung ke DB pada mesin jarak jauh. Dibutuhkan waktu lama untuk menjalankan begitu banyak pertanyaan. Jadi, dengan asumsi bahwa menjalankannya secara asinkron atau paralel akan mempercepat, saya sedang menjajaki TPL Dataflow (TDF). Saya ingin menggunakan perpustakaan TDF karena tampaknya menangani semua kekhawatiran terkait penulisan kode multi-utas yang seharusnya dilakukan dengan tangan.

Kode yang ditampilkan didasarkan pada http://blog.i3arnon.com/2016/05/23/tpl-dataflow/. Minimal dan hanya untuk membantu saya memahami pengoperasian dasar TDF. Perlu diketahui bahwa saya telah membaca banyak blog dan mengkodekan banyak iterasi untuk mencoba memecahkan masalah ini.

Meskipun demikian, dengan iterasi saat ini, saya memiliki satu masalah dan pertanyaan:

Masalah

Kode ini berada di dalam metode button click (Menggunakan UI, pengguna memilih mesin, instance sql, dan database, lalu memulai pemindaian). Dua baris dengan operator await mengembalikan kesalahan pada waktu pembuatan: The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'. Saya tidak dapat mengubah tipe pengembalian metode klik tombol. Apakah saya perlu mengisolasi metode button click dari kode async-await?

Pertanyaan

Meskipun saya telah menemukan tulisan beau-coup yang menjelaskan dasar-dasar TDF, saya tidak dapat menemukan contoh bagaimana mendapatkan output yang dihasilkan oleh setiap pemanggilan TransformBlock (yaitu, DataTable). Meskipun saya ingin mengirimkan pertanyaan async, saya perlu memblokir sampai semua pertanyaan yang dikirimkan ke TransformBlock selesai. Bagaimana cara mendapatkan rangkaian DataTables yang dihasilkan oleh TransformBlock dan memblokir hingga semua kueri selesai?

Catatan: Saya mengakui bahwa saya hanya memiliki satu blok sekarang. Minimal, saya akan menambahkan blok pembatalan dan saya juga perlu/ingin menggunakan TPL.

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{

    UserInput userInput = new UserInput
    {
        MachineName = "gat-admin",
        InstanceName = "",
        DbName = "AdventureWorks2014",
    };

    DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);

    //CreateTableQueryList gets a list of all tables from the DB and returns a list of 
    // select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
    IList<String> tableQueryList = CreateTableQueryList(userInput);

    // Define a block that accepts a select statement and returns a DataTable of results
    // where each returned record is: schemaname + tablename + columnname + column datatype + field data
    // e.g., if the select query returns one record with 5 columns, then a datatable with 5 
    // records (one per field) will come back 

    var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
        async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 2,
        });

    // Add items to the block and start processing
    foreach (String tableQuery in tableQueryList)
    {
        await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
    }

    // Enable the Cancel button and disable the Start button.
    toolStripButtonStart.Enabled = false;
    toolStripButtonStop.Enabled = true;

    //shut down the block (no more inputs or outputs)
    transformBlock_SubmitTableQuery.Complete();

    //await the completion of the task that procduces the output DataTable
    await transformBlock_SubmitTableQuery.Completion;
}

public async Task<DataTable> _SubmitSelectStatement(string queryString )
{
    try
    {

        .
        .
        await Task.Run(() => sqlDataAdapter.Fill(dt));

        // process dt into the output DataTable I need

        return outputDt;
    }
    catch
    {
        throw;
    }

}

person William Charlton    schedule 20.03.2018    source sumber
comment
Asumsi yang buruk. Jika kueri lambat perbaiki. Menjalankan kueri yang lebih lambat di server yang sama dengan CPU yang sama dan disk yang sama melalui jaringan yang sama hanya akan memperlambat. Memuat hasilnya ke dalam DataTable akan menambah lebih banyak penundaan.   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
Kueri yang memuat semuanya ke dalam klien untuk diproses menghasilkan penundaan yang jauh lebih buruk. Klien memiliki lebih sedikit memori, lebih sedikit CPU, lebih sedikit IO disk dibandingkan server dan tidak indeks untuk mempercepat segalanya. Memuat semuanya ke klien untuk diproses adalah ide yang buruk.   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
BTW jika Anda mencoba memproses data untuk misalnya menghasilkan laporan atau mengisi skema pelaporan, membuat database pelaporan atau gudang data yang tepat dan mengisinya menggunakan alat ETL seperti SSIS. Perbarui hanya baris yang telah diubah. Performa kueri akan jauh lebih baik daripada pemrosesan di klien. Bekerja hanya dengan perubahan akan menjadi lipat lebih cepat daripada melakukan semuanya juga.   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
@PanagiotisKanavos: Semua poin bagus dan valid. Namun, itulah adanya. Merupakan persyaratan bahwa penggunaan program ini tidak melibatkan perubahan pada mesin (produksi) yang databasenya sedang dipindai. Saya hanya perlu menjalankan kueri secepat mungkin, tidak harus secepat kilat   -  person William Charlton    schedule 20.03.2018


Jawaban (2)


Cara yang benar untuk mengambil output TransformBlock adalah dengan melakukan loop bersarang menggunakan metode OutputAvailableAsync dan TryReceive. Ini agak berantakan, jadi Anda bisa mempertimbangkan untuk menyembunyikan kompleksitas ini dari kode aplikasi Anda dengan menyalin-menempelkan metode ekstensi di bawah ini di beberapa kelas statis proyek Anda:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

Kemudian Anda bisa menggunakan metode ToListAsync seperti ini:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
    var transformBlock = new TransformBlock<string, DataTable>(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    {
        // Do something with the dataTable
    }
}

Jika Anda telah mengupgrade proyek Anda ke C# 8 maka Anda juga memiliki opsi untuk mengambil output secara streaming, sebagai IAsyncEnumerable:

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> block,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            yield return item;
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
}

Dengan cara ini Anda akan bisa mendapatkan setiap DataTable segera setelah dimasak, tanpa harus menunggu semua pertanyaan diproses. Untuk menggunakan IAsyncEnumerable Anda cukup memindahkan await sebelum foreach:

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
    // Do something with the dataTable
}
person Theodor Zoulias    schedule 16.06.2020

Ternyata, untuk memenuhi persyaratan saya, TPL Dataflow agak berlebihan. Saya dapat memenuhi persyaratan saya menggunakan async/await dan Task.WhenAll. Saya menggunakan Microsoft How-To Cara: Memperluas Panduan async dengan Menggunakan Task.WhenAll (C#) sebagai model.

Mengenai "Masalah" saya

"Masalah" saya bukanlah masalah. Tanda tangan metode peristiwa (dalam kasus saya, metode klik tombol "Mulai" yang memulai pencarian saya) dapat dimodifikasi menjadi async. Dalam solusi Microsoft How-To GetURLContentsAsync, lihat tanda tangan metode startButton_Click:

private async void startButton_Click(object sender, RoutedEventArgs e)  
{  
    .
    .
}  

Mengenai pertanyaan saya

Menggunakan Task.WhenAll, saya bisa menunggu semua pertanyaan saya selesai lalu memproses semua output untuk digunakan di UI saya. Dalam solusi Microsoft How-To GetURLContentsAsync, lihat metode SumPageSizesAsync, yaitu array int bernama lengths adalah jumlah dari semua output.

private async Task SumPageSizesAsync()  
{  
    .
    .
    // Create a query.   
    IEnumerable<Task<int>> downloadTasksQuery = from url in urlList select ProcessURLAsync(url);  

    // Use ToArray to execute the query and start the download tasks.  
    Task<int>[] downloadTasks = downloadTasksQuery.ToArray();  

    // Await the completion of all the running tasks.  
    Task<int[]> whenAllTask = Task.WhenAll(downloadTasks);  

    int[] lengths = await whenAllTask;  
    .
    .
}    
person William Charlton    schedule 25.03.2018