对于 TPL 数据流:如何在阻塞直到处理完所有输入之前获得 TransformBlock 产生的所有输出?

Posted

技术标签:

【中文标题】对于 TPL 数据流:如何在阻塞直到处理完所有输入之前获得 TransformBlock 产生的所有输出?【英文标题】:For a TPL Dataflow: How do I get my hands on all the output produced by a TransformBlock while blocking until all inputs have been processed? 【发布时间】:2018-08-29 13:35:00 【问题描述】:

我正在将一系列 select 语句(查询 - 数以千计)同步提交到单个数据库,并在每个查询中返回一个 DataTable(注意:该程序具有数据库模式的知识它仅在运行时扫描,因此使用DataTables)。该程序在客户端机器上运行并连接到远程机器上的数据库。运行这么多查询需要很长时间。因此,假设异步或并行执行它们会加快速度,我正在探索TPL Dataflow (TDF)。我想使用TDF 库,因为它似乎可以处理与编写多线程代码相关的所有问题,否则这些问题需要手动完成。

显示的代码基于http://blog.i3arnon.com/2016/05/23/tpl-dataflow/。它是最小的,只是为了帮助我理解TDF的基本操作。请知道我已经阅读了许多博客并编写了许多迭代来试图破解这个坚果。

不过,在当前的迭代中,我有一个问题和一个问题:

问题

代码位于button click 方法中(使用 UI,用户选择一台机器、一个 sql 实例和一个数据库,然后开始扫描)。带有await 运算符的两行在构建时返回错误:The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'。我无法更改按钮单击方法的返回类型。我是否需要以某种方式将button click 方法与async-await 代码隔离开来?

问题

虽然我找到了描述TDF 基础知识的漂亮文章,但我找不到一个示例来说明如何掌握每次调用TransformBlock 产生的输出(即, DataTable)。虽然我想提交查询async,但我确实需要阻止,直到提交给TransformBlock 的所有查询都完成。如何获得由TransformBlock 生成的一系列DataTables 并阻止直到所有查询完成?

注意:我承认我现在只有一个街区。至少,我将添加一个取消块,因此需要/想要使用 TPL。

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)


    UserInput userInput = new UserInput
    
        MachineName = "gat-admin",
        InstanceName = "",
        DbName = "AdventureWorks2014",
    ;

    DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);

    //CreateTableQueryList gets a list of all tables from the DB and returns a list of 
    // select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
    IList<String> tableQueryList = CreateTableQueryList(userInput);

    // Define a block that accepts a select statement and returns a DataTable of results
    // where each returned record is: schemaname + tablename + columnname + column datatype + field data
    // e.g., if the select query returns one record with 5 columns, then a datatable with 5 
    // records (one per field) will come back 

    var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
        async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
        new ExecutionDataflowBlockOptions
        
            MaxDegreeOfParallelism = 2,
        );

    // Add items to the block and start processing
    foreach (String tableQuery in tableQueryList)
    
        await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
    

    // Enable the Cancel button and disable the Start button.
    toolStripButtonStart.Enabled = false;
    toolStripButtonStop.Enabled = true;

    //shut down the block (no more inputs or outputs)
    transformBlock_SubmitTableQuery.Complete();

    //await the completion of the task that procduces the output DataTable
    await transformBlock_SubmitTableQuery.Completion;


public async Task<DataTable> _SubmitSelectStatement(string queryString )

    try
    

        .
        .
        await Task.Run(() => sqlDataAdapter.Fill(dt));

        // process dt into the output DataTable I need

        return outputDt;
    
    catch
    
        throw;
    


【问题讨论】:

错误的假设。如果查询很慢修复它。在同一网络上使用同一 CPU 和同一磁盘的同一服务器上运行更慢的查询只会 事情。将结果加载到 DataTable 会增加更多延迟。 将所有内容加载到 client 进行处理的查询会导致更严重的延迟。 client 比服务器具有更少的内存、更少的 CPU、更少的磁盘 IO 和 no 索引以加快速度。将所有内容加载到客户端进行处理是个坏主意。 顺便说一句,如果您尝试处理数据以生成报告或填充报告模式,请创建适当的报告数据库或数据仓库并使用 ETL 工具(如 SSIS)填充它。仅更新已更改的行。查询性能将比在客户端处理好很多数量级。仅处理更改将比拉动所有内容快几个数量级。 @PanagiotisKanavos:所有好的和有效的点。然而,事实就是如此。要求使用该程序不涉及对正在扫描其数据库的(生产)机器进行任何更改。我只需要尽可能快地运行查询,而不一定非常快 【参考方案1】:

检索TransformBlock 的输出的最安全¹ 方法是使用OutputAvailableAsyncTryReceive 方法执行嵌套循环。这有点混乱,因此您可以考虑通过将下面的扩展方法复制粘贴到项目的某个静态类中来从应用程序代码中隐藏这种复杂性:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)

    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    
        while (block.TryReceive(out var item))
        
            list.Add(item);
        
    
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;

然后你可以像这样使用ToListAsync 方法:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)

    var transformBlock = new TransformBlock<string, DataTable>(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    
        // Do something with each dataTable
    

如果您已将项目升级到 C# 8,那么您还可以选择以流方式检索输出,例如 IAsyncEnumerable&lt;T&gt;

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> block,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)

    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    
        while (block.TryReceive(out var item))
        
            yield return item;
            cancellationToken.ThrowIfCancellationRequested();
        
    
    await block.Completion.ConfigureAwait(false); // Propagate possible exception

这样您就可以在每个DataTable 煮熟后立即获得它,而无需等待所有查询的处理。要使用 IAsyncEnumerable&lt;T&gt;,您只需将 await 移动到 foreach 之前:

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())

    // Do something with each dataTable

¹ 另一种方法是使用链接的BufferBlock&lt;T&gt;TryReceiveAll 方法,如dcastro 的this answer 所示。根据我的实验,该方法效果很好,甚至比我建议的 OutputAvailableAsync+TryReceive 方法效果略好,但我个人并不 100% 相信它。为了确信不会丢失任何消息,必须深入研究BufferBlock&lt;T&gt; 类的source code,并说服自己,当第一个块发出完成信号时,链接的BufferBlock&lt;T&gt; 已经占用了所有消息,并且没有消息在瞬态异步传输状态中悬空。


更新:下面是ToListAsync 方法的更复杂版本,它传播底层块的所有错误。最初的简单ToListAsync 方法只传播第一个错误。

public static Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)

    return Implementation().ContinueWith(t =>
    
        if (t.IsCanceled) return t;
        Debug.Assert(block.Completion.IsCompleted);
        if (block.Completion.IsFaulted)
        
            var tcs = new TaskCompletionSource<List<T>>();
            tcs.SetException(block.Completion.Exception.InnerExceptions);
            return tcs.Task;
        
        if (block.Completion.IsCanceled) block.Completion.GetAwaiter().GetResult();
        return t;
    , default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();

    async Task<List<T>> Implementation()
    
        var list = new List<T>();
        while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
            while (block.TryReceive(out var item))
                list.Add(item);
        await block.Completion.ConfigureAwait(false);
        return list;
    

【讨论】:

【参考方案2】:

事实证明,为了满足我的要求,TPL Dataflow 有点矫枉过正。我能够使用async/awaitTask.WhenAll 来满足我的要求。我使用了 Microsoft How-To How to: Extend the async Walkthrough by Using Task.WhenAll (C#) 作为模型。

关于我的“问题”

我的“问题”不是问题。一个事件方法签名(在我的例子中,一个启动我的搜索的“开始”按钮点击方法)可以修改为async。在 Microsoft How-To GetURLContentsAsync 解决方案中,请参阅 startButton_Click 方法签名:

private async void startButton_Click(object sender, RoutedEventArgs e)  
  
    .
    .
  

关于我的问题

使用 Task.WhenAll,我可以等待所有查询完成,然后处理所有输出以在我的 UI 上使用。在 Microsoft How-To GetURLContentsAsync 解决方案中,请参阅 SumPageSizesAsync 方法,即名为 lengths 的 int 数组是所有输出的总和。

private async Task SumPageSizesAsync()  
  
    .
    .
    // Create a query.   
    IEnumerable<Task<int>> downloadTasksQuery = from url in urlList select ProcessURLAsync(url);  

    // Use ToArray to execute the query and start the download tasks.  
    Task<int>[] downloadTasks = downloadTasksQuery.ToArray();  

    // Await the completion of all the running tasks.  
    Task<int[]> whenAllTask = Task.WhenAll(downloadTasks);  

    int[] lengths = await whenAllTask;  
    .
    .
    

【讨论】:

以上是关于对于 TPL 数据流:如何在阻塞直到处理完所有输入之前获得 TransformBlock 产生的所有输出?的主要内容,如果未能解决你的问题,请参考以下文章

TPL Dataflow 如何与“全局”数据同步

阻塞队列

Socket Accept() 阻塞,直到我按下一个键

如何从标准输入读取一行,阻塞直到找到换行符?

如何使用 C# 4 中的 TPL 创建一个常量处理“流”

如何对连接的 TPL 数据流块进行错误处理?