如何并行处理项目然后合并结果?

Posted

技术标签:

【中文标题】如何并行处理项目然后合并结果?【英文标题】:How to process items in parallel and then merge the results? 【发布时间】:2012-11-09 22:41:07 【问题描述】:

我遇到以下问题:

我有一个Foo 对象的数据流,并将这些对象流式传输到几个并发的进程内任务/线程,这些任务/线程依次处理对象并输出FooResult 对象。每个FooResult 都包含与其他成员相同的Foo,该Foo 用于创建FooResult。然而,并不是每个Foo 都必须创建一个FooResult

我的问题是,我想从整个过程中传递一个包装对象,该对象包含原始 Foo 以及可能从并发中的 Foo 创建的所有 FooResult 对象(如果有)任务。

注意:我目前使用 TPL 数据流,而每个并发进程都发生在一个 ActionBlock<Foo> 中,该 ActionBlock<Foo> 链接到 BroadCastBlock<Foo>。它使用SendAsync() 到目标数据流块以发送可能创建的FooResult。显然,并发数据流块在不可预测的时间产生FooResult,这是我目前正在努力解决的问题。我似乎无法弄清楚在所有ActionBlock<Foo> 中创建了多少FooResult,以便我可以将它们与原始Foo 捆绑在一起并将其作为包装对象传递。

在伪代码中,它目前看起来如下:

BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2; 
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);

aBlock1 = new ActionBlock<Foo>(foo =>

    //do something here. Sometimes create a FooResult. If then
    targetBlock.SendAsync(fooResult);
);

//similar for aBlock2

但是,当前代码的问题在于,如果 Foo 没有在任何操作块中生成单个 FooResult,则 targetBlock 可能不会收到任何内容。此外,targetBlock 可能接收到 2 个FooResult 对象,因为每个操作块都生成了一个FooResult

我想要的是 targetBlock 接收一个包含每个 Foo 的包装对象,如果创建了 FooResult 对象,那么还有一个 FooResult 的集合。

有什么想法可以使解决方案按照描述的方式工作吗?它不必仔细阅读 TPL 数据流,但如果这样做会很整洁。

更新:以下是我通过 svick 建议的 JoinBlock 实现得到的。我不会使用它(除非它可以在性能方面进行调整),因为它运行起来非常慢,我每秒可以处理大约 89000 个项目(而且这只是 int 值类型)。

public class Test

    private BroadcastBlock<int> broadCastBlock;
    private TransformBlock<int, int> transformBlock1;
    private TransformBlock<int, int> transformBlock2;
    private JoinBlock<int, int, int> joinBlock;
    private ActionBlock<Tuple<int, int, int>> processorBlock;

    public Test()
    
        broadCastBlock = new BroadcastBlock<int>(i =>
            
                return i;
            );

        transformBlock1 = new TransformBlock<int, int>(i =>
            
                return i;
            , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded );

        transformBlock2 = new TransformBlock<int, int>(i =>
            
                return i;
            , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded );

        joinBlock = new JoinBlock<int, int, int>();

        processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
            
                //Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
            , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded );

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions  PropagateCompletion = true );
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions  PropagateCompletion = true );

        broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions  PropagateCompletion = true );
        transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions  PropagateCompletion = true );
        transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions  PropagateCompletion = true );

        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions  PropagateCompletion = true );
    

    public void Start()
    
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        
            broadCastBlock.Post(i);
        

        ////mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    

更新代码以反映建议:

public Test()
    
        broadCastBlock = new BroadcastBlock<int>(i =>
            
                return i;
            );

        transformBlock1 = new TransformBlock<int, int>(i =>
            
                return i;
            );

        transformBlock2 = new TransformBlock<int, int>(i =>
            
                return i;
            );

        joinBlock = new JoinBlock<int, int>();

        processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
            
                //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
            );

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions  PropagateCompletion = true );
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions  PropagateCompletion = true );
        transformBlock1.LinkTo(joinBlock.Target1);
        transformBlock2.LinkTo(joinBlock.Target2);
        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions  PropagateCompletion = true );
    

    public void Start()
    
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        
            broadCastBlock.Post(i);
        

        ////mark completion
        broadCastBlock.Complete();
        Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    

【问题讨论】:

使用MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 是粗略的。本质上,您命令 TPL 数据流同时处理所有发布的项目。这只能通过为每个项目创建一个Task 并将其安排在ThreadPool 上来完成。因此,您在分配空间方面造成了巨大的开销,并为ThreadPool 带来了巨大的调度负担。最终,ThreadPool 中的线程可用性会隐含地限制您,因为它不可能有一百万个可用线程。简而言之:不要这样做,这是个坏主意。 【参考方案1】:

我可以看到两种方法来解决这个问题:

    使用JoinBlock。您的广播块和两个工作块将分别发送到连接块的一个目标。如果一个工作块没有任何结果,它将给它null(或其他一些特殊值)。您的工作块需要更改为TranformBlock&lt;Foo, FooResult&gt;,因为使用ActionBlock 的方式并不能保证排序(至少在设置MaxDegreeOfParallelism 时不能保证),TransformBlock 可以。

    JoinBlock 的结果将是 Tuple&lt;Foo, FooResult, FooResult&gt;,其中任何一个或两个 FooResults 可以是 null

    虽然我不确定我是否喜欢这个解决方案在很大程度上依赖于正确的项目排序,但这对我来说似乎很脆弱。

    使用其他对象进行同步。当所有块都完成某个项目时,该对象将负责向前发送结果。这类似于马里奥在他的回答中建议的NotificationWrapper

    在这种情况下,您可以使用TaskCompletionSourceTask.WhenAll() 来处理同步问题。

【讨论】:

我在我的问题中添加了一个使用 JoinBlock 的实现(请参阅“更新”)。这看起来对吗?它似乎运行正确,但速度极慢,大约每秒 89k 项。如果我不能提高性能,那么我需要考虑一个不同的解决方案。有什么想法吗?或者您认为这是最好的? 在我的电脑上,你的代码给了我 ~350-400k/s。如果我删除JoinBlock,所以它是广播→变换1→动作1和广播→变换2→动作2,我得到大致相同的性能。所以我认为JoinBlock 不会让你慢很多。 谢谢,上面的代码让我每秒可以处理 580k 个元素。不幸的是,我的目的还不够。我需要仔细考虑这一切,可能我会在项目的这个特定部分中删除并发,基本上不使用 TPL 数据流,没有广播块,而是以循环类型的方式将消息提供给不同的实例并收集结果,然后将它们传递下去。但需要根据性能概况做出最终决定。谢谢指点。 我此时正在学习 TPL 数据流,并且在使用 JoinBlocks 合并结果时遇到了同样的问题。所以我设想一个目标 BufferBlock 来存储所有结果,然后将它有条件地链接(LinkTo 带有谓词)到将进行合并的特定块。这可能是可接受的第三种方式吗?【参考方案2】:

据我了解的问题:

lock foo
work on foo
if foo has not triggered sending a result
and fooResult exists
   send fooResult
   remember in foo that result has already been sent
unlock foo

OP 评论后更新

所以将 foo 推入您的广播块

BroadCastBlock<Foo> bcb = new BroadCastBlock<Foo>(foo);
...

if ( aBlock1.HasResult ) 

    bcb.Add( aBlock1.Result );


if ( aBlock2.HasResult ) 

    bcb.Add( aBlock2.Result );

现在您可以查询 bcb 以了解存在的内容并发送所需的内容(或仅发送 bcb)。

更新(在 cmets 进行更多讨论后)

class NotificationWrapper<TSource, TResult>

   private readonly TSource originalSource;

   private Queue<TResult> resultsGenerated = new Queue<TResult>()

   private int workerCount = 0;

   public NotificationWrapper<TSource, TResult>( TSource originalSource, int workerCount )
   
       this.originalSource = originalSource;
       this.workerCount = workerCount;
   

   public void NotifyActionDone()
   
       lock( this )
       
          --workerCount;
          if ( 0 == workerCount )
          
             //do my sending
             send( originalSource, resultsGenerated );
          
       
   

    public void NotifyActionDone( TResult result )
    
        lock ( this )
        
            resultsGenerated.push( result );
            NotifyActionDone();
        
    

在调用代码中:

NotificationWrapper<Foo, Fooresult> notificationWrapper = new NotificationWrapper<Foo, Fooresult>( foo, 2 );
ActionBlock<Foo> ab1 = new ActionBlock<Foo>( foo, notificationWrapper );
ActionBlock<Foo> ab2 = new ActionBlock<Foo>( foo, notificationWrapper );

一旦完成计算,ActionBlock 需要更改为调用NotifyActionDone()NotifyActoinDone( Fooresult )

【讨论】:

嗯,不是绝对正确(如果我理解正确的话):将 foo 流式传输到每个动作块 -> 让动作块产生 fooResult 或什么都不产生(尽管我总是可以让它传递一个 foo 并且如果一个 fooResult已创建然后附加它)-> 等到我知道在 actionBlocks 中处理了每个 foo -> 发送一个包含原始 foo 和可能创建的 fooResults 的包装对象。 恐怕并不能真正解决问题。首先,在 hasResult 查询时,操作块可能尚未生成结果,因为处理时间可能需要更长的时间,而不是因为没有生成 fooResult。其次,您的解决方案不保证每个结果都将引用流入块中的相同 foo 。我只想捆绑引用相同 foo 对象的结果。 如果您为每个新的 foo 创建一个新的 BroadCastBLock,那么它会。你知道有多少个动作块吗? 是的,我确实知道有多少个动作块,正如我的问题中提到的,我考虑总是输出一个包装对象作为每个动作块的结果,这是流式传输的 foo 加上一个 fooResult如果它是生成的。我认为为每个 foo 创建新的 BroadcastBlocks 没有任何意义。它已经在这会产生的开销上失败了。我每秒处理数百万个 foo 对象。必须有更好的解决方案,至少创建多个广播块对我来说是不行的。 感谢您的代码,但您的代码似乎仍然不能确保相同 Foo 实例的操作块的结果最终出现在相同的 NotificationWrapper 对象中。甚至可能发生不同 Foo 的相同动作块的 2 个结果最终出现在同一个包装对象中。另外,我猜您上面的代码与 NotificationWrapper 类对象有关,对吗?班级装饰不见了。此外,您的 ActionBlock 实例化不正确,甚至无法编译。抱歉,恐怕答案会更复杂。

以上是关于如何并行处理项目然后合并结果?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用多处理来并行化收集与给定条件匹配的项目的过滤功能?

如何通过并行处理数据库结果来提高性能?

使用分支/合并框架执行并行求和

如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?

JAVA并行框架Fork/Join:简介和代码示例

如何并行处理数据但将结果写入 Spark 中的单个文件