让线程在开始下一组任务之前等待所有任务完成

Posted

技术标签:

【中文标题】让线程在开始下一组任务之前等待所有任务完成【英文标题】:Let threads wait for all tasks to complete before starting on the next set of tasks 【发布时间】:2020-08-23 12:16:13 【问题描述】:

我有一个由多个阶段组成的管道。同一阶段的作业可以并行处理。但是必须先完成第 1 阶段的所有工作,然后才能开始从事第 2 阶段的工作,等等。

我正在考虑使用 CountDownEvent 同步这项工作。

我的基本结构是

this.WorkerCountdownEvent = new CountdownEvent(MaxJobsInStage);
this.WorkerCountdownEvent.Signal(MaxJobsInStage); // Starts all threads
// Each thread runs the following code

for (this.currentStage = 0; this.currentStage < this.PipelineStages.Count; this.currentStage++)

    this.WorkerCountdownEvent.Wait();
    var stage = this.PipelineStages[this.currentStage];
    if (stage.Systems.Count < threadIndex)
    
        var system = stage.Systems[threadIndex];
        system.Process();
    

    this.WorkerCountdownEvent.Signal(); // <--


这适用于处理一个阶段。但是到达this.WorkerCountdownEvent.Signal() 的第一个线程将导致应用程序崩溃,因为它试图将信号减小到零以下。

当然,如果我想阻止这种情况,并且让工作再次等待,我必须致电this.WorkerCountdownEvent.Reset()。但是我必须在所有线程开始工作之后调用它,但在一个线程完成它的工作之前。这似乎是一项不可能完成的任务?

我是否使用了错误的同步原语?或者我应该使用两个倒计时事件?还是我完全错过了什么?

(顺便说一句,工作通常需要不到一毫秒的时间,所以如果有人有更好的方法来使用像 ManualResetEventSlim 这样的“苗条”原语来做到这一点,则可以加分。线程池或任务不是我正在寻找的方向,因为这些线程将持续很长时间(数小时),并且需要每秒通过管道 60 次。因此,停止/启动任务的开销在这里相当大)。

编辑:这个问题被标记为两个问题的重复。其中一个问题的答案是“使用 thread.Join()”,另一个问题是“使用 TPL”,这两个答案(在我看来)显然不是关于流水线和线程原语的问题的答案,例如 CountDownEvent .

【问题讨论】:

如果你正在使用任务,为什么不直接使用Task.WhenAll。例如调用await Task.WhenAll(stage1Tasks),然后在完成后启动stage2任务。 我建议从您的问题中删除“任务”一词的所有实例,因为您指的不是Task 对象,而Tasks 目前作为解决相关问题的工具非常流行异步和并行。因此,在不提及Tasks 的情况下谈论“任务”会造成混乱。 我试图让它更清晰,并用删除所有提到的单词 task 替换免责声明,并解释为什么 Task 不合适。但不幸的是,在我看来,这个问题已经作为两个非常不同的问题的副本而被关闭了。 我通过创建自己的CountDownEvent 版本解决了这个问题,不幸的是我无法发布答案......所以我希望这个链接能保持一段时间:github.com/roy-t/EntitySystemTest/blob/master/Project/Threading/… 是的,我什至将它标记为版主注意,但没有结果。顺便说一句,这是一个很好的技巧:)。 【参考方案1】:

我认为最适合这种情况的同步原语是Barrier

使多个任务能够通过多个阶段并行协作处理算法。

使用示例:

private Barrier _barrier = new Barrier(this.WorkersCount);

// Each worker thread runs the following code
for (i = 0; i < this.StagesCount; i++)

    // Here goes the work of a single worker for a single stage...
    _barrier.SignalAndWait();


更新:如果您希望工作人员异步等待信号,可以使用 AsyncBarrier 实现 here。

【讨论】:

以上是关于让线程在开始下一组任务之前等待所有任务完成的主要内容,如果未能解决你的问题,请参考以下文章

面试官:如何让主线程等待所有的子线程结束之后再执行?我懵了

Python等待所有线程任务完成

Java笔记:多线程

让一个 AsyncTask 等待另一个完成?

如何等待java线程池中所有任务完成

Java之CyclicBarrier使用