Для потока данных TPL: как мне получить все выходные данные, созданные TransformBlock, при блокировке, пока все входные данные не будут обработаны?

Я отправляю серию select операторов (запросов - тысячи из них) в одну базу данных синхронно и получаю один DataTable на запрос (Примечание: эта программа такова, что она знает схему БД, которую она сканирует, только во время выполнения , отсюда и использование DataTables). Программа запускается на клиентской машине и подключается к БД на удаленной машине. Выполнение такого количества запросов занимает много времени. Итак, предполагая, что их асинхронное или параллельное выполнение ускорит процесс, я изучаю TPL Dataflow (TDF). Я хочу использовать библиотеку TDF, потому что она решает все проблемы, связанные с написанием многопоточного кода, который в противном случае пришлось бы делать вручную.

Показанный код основан на http://blog.i3arnon.com/2016/05/23/tpl-dataflow/. Он минимален и предназначен только для того, чтобы помочь мне понять основные операции TDF. Пожалуйста, знайте, что я прочитал много блогов и закодировал много итераций, пытаясь взломать этот орех.

Тем не менее, с этой текущей итерацией у меня есть одна проблема и вопрос:

Проблема

Код находится внутри метода button click (используя пользовательский интерфейс, пользователь выбирает машину, экземпляр sql и базу данных, а затем запускает сканирование). Две строки с оператором await возвращают ошибку во время сборки: The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'. Я не могу изменить возвращаемый тип метода нажатия кнопки. Нужно ли как-то изолировать метод button click от кода async-await?

Вопрос

Несмотря на то, что я нашел статьи, описывающие основы TDF, я не могу найти пример того, как получить в свои руки вывод, который производит каждый вызов TransformBlock (т. е. DataTable). Хотя я хочу отправить запросы async, мне нужно заблокировать их до тех пор, пока все запросы, отправленные в TransformBlock, не будут выполнены. Как мне получить серию DataTable, созданных TransformBlock, и заблокировать, пока все запросы не будут выполнены?

Примечание. Я признаю, что сейчас у меня есть только один блок. Как минимум, я добавлю блокировку отмены, поэтому мне нужно/хочется использовать TPL.

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{

    UserInput userInput = new UserInput
    {
        MachineName = "gat-admin",
        InstanceName = "",
        DbName = "AdventureWorks2014",
    };

    DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);

    //CreateTableQueryList gets a list of all tables from the DB and returns a list of 
    // select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
    IList<String> tableQueryList = CreateTableQueryList(userInput);

    // Define a block that accepts a select statement and returns a DataTable of results
    // where each returned record is: schemaname + tablename + columnname + column datatype + field data
    // e.g., if the select query returns one record with 5 columns, then a datatable with 5 
    // records (one per field) will come back 

    var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
        async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 2,
        });

    // Add items to the block and start processing
    foreach (String tableQuery in tableQueryList)
    {
        await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
    }

    // Enable the Cancel button and disable the Start button.
    toolStripButtonStart.Enabled = false;
    toolStripButtonStop.Enabled = true;

    //shut down the block (no more inputs or outputs)
    transformBlock_SubmitTableQuery.Complete();

    //await the completion of the task that procduces the output DataTable
    await transformBlock_SubmitTableQuery.Completion;
}

public async Task<DataTable> _SubmitSelectStatement(string queryString )
{
    try
    {

        .
        .
        await Task.Run(() => sqlDataAdapter.Fill(dt));

        // process dt into the output DataTable I need

        return outputDt;
    }
    catch
    {
        throw;
    }

}

person William Charlton    schedule 20.03.2018    source источник
comment
Плохое предположение. Если запрос медленный, исправьте это. Выполнение более медленных запросов на том же сервере с тем же процессором и тем же диском в той же сети только замедлит работу. Загрузка результатов в DataTable добавит еще больше задержек.   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
Запросы, которые загружают все в клиент для обработки, приводят к гораздо большим задержкам. У клиента меньше памяти, меньше ЦП, меньше дисковых операций ввода-вывода, чем у сервера, и нет индексов для ускорения работы. Загружать все в клиент для обработки — плохая идея.   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
Кстати, если вы пытаетесь обрабатывать данные, например, для создания отчетов или заполнения схемы отчетов, создайте соответствующую базу данных отчетов или хранилище данных и заполните ее с помощью инструментов ETL, таких как SSIS. Обновите только те строки, которые изменились. Производительность запросов будет на много порядков выше, чем при обработке на клиенте. Работать только с изменениями будет на порядки быстрее, чем тянуть тоже все подряд.   -  person Panagiotis Kanavos    schedule 20.03.2018
comment
@PanagiotisKanavos: Все хорошие и действительные моменты. Однако это то, что есть. Требуется, чтобы при использовании этой программы не вносились изменения в (производственную) машину, база данных которой сканируется. Мне просто нужно, чтобы запросы выполнялись как можно быстрее, не обязательно молниеносно.   -  person William Charlton    schedule 20.03.2018


Ответы (2)


Правильный способ получить выходные данные TransformBlock — выполнить вложенный цикл с использованием методов OutputAvailableAsync и TryReceive. Это немного запутанно, поэтому вы можете скрыть эту сложность из кода вашего приложения, скопировав метод расширения ниже в какой-нибудь статический класс вашего проекта:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

Затем вы можете использовать метод ToListAsync следующим образом:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
    var transformBlock = new TransformBlock<string, DataTable>(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    {
        // Do something with the dataTable
    }
}

Если вы обновили свой проект до C# 8, у вас также есть возможность получить выходные данные в потоковом режиме в виде IAsyncEnumerable:

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> block,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            yield return item;
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
}

Таким образом, вы сможете получить доступ к каждому DataTable сразу после его приготовления, не дожидаясь обработки всех запросов. Чтобы использовать IAsyncEnumerable, просто переместите await перед foreach:

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
    // Do something with the dataTable
}
person Theodor Zoulias    schedule 16.06.2020

Как оказалось, для удовлетворения моих требований TPL Dataflow немного излишне. Я смог удовлетворить свои требования, используя async/await и Task.WhenAll. Я использовал инструкции Microsoft Практическое руководство. Расширение асинхронного пошагового руководства с использованием Task.WhenAll (C#) в качестве модели.

Относительно моей "Проблемы"

Моя "проблема" - не проблема. Сигнатуру метода события (в моем случае это метод нажатия кнопки «Пуск», который инициирует мой поиск) можно изменить на async. В решении Microsoft How-To GetURLContentsAsync см. сигнатуру метода startButton_Click:

private async void startButton_Click(object sender, RoutedEventArgs e)  
{  
    .
    .
}  

Относительно моего вопроса

Используя Task.WhenAll, я могу дождаться завершения всех своих запросов, а затем обработать все выходные данные для использования в своем пользовательском интерфейсе. В решении Microsoft How-To GetURLContentsAsync см. метод SumPageSizesAsync, т. е. массив int с именем lengths представляет собой сумму всех выходных данных.

private async Task SumPageSizesAsync()  
{  
    .
    .
    // Create a query.   
    IEnumerable<Task<int>> downloadTasksQuery = from url in urlList select ProcessURLAsync(url);  

    // Use ToArray to execute the query and start the download tasks.  
    Task<int>[] downloadTasks = downloadTasksQuery.ToArray();  

    // Await the completion of all the running tasks.  
    Task<int[]> whenAllTask = Task.WhenAll(downloadTasks);  

    int[] lengths = await whenAllTask;  
    .
    .
}    
person William Charlton    schedule 25.03.2018