确保线程池中的任务执行顺序

Posted

技术标签:

【中文标题】确保线程池中的任务执行顺序【英文标题】:Ensuring task execution order in ThreadPool 【发布时间】:2011-11-03 18:25:45 【问题描述】:

我一直在阅读有关线程池模式的信息,但似乎找不到以下问题的常用解决方案。

我有时希望任务按顺序执行。例如,我从文件中读取文本块,出于某种原因,我需要按该顺序处理这些块。所以基本上我想消除某些任务的并发

考虑这种情况,* 的任务需要按照它们被推入的顺序处理。其他任务可以按任何顺序处理。

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

在线程池的上下文中,如果没有这个限制,单个待处理任务队列可以正常工作,但显然这里不行。

我想过让一些线程在特定于线程的队列上运行,而其他线程在“全局”队列上运行。然后,为了串行执行一些任务,我只需将它们推送到一个队列中,其中单个线程看起来。 确实听起来有点笨拙。

那么,这个长篇故事中的真正问题是:您将如何解决这个问题? 您将如何确保这些任务按顺序进行

编辑

作为一个更普遍的问题,假设上面的场景变成了

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

我的意思是组内的任务应该按顺序执行,但组本身可以混合。例如,您可以使用3-2-5-4-7

要注意的另一件事是,我无法预先访问组中的所有任务(而且我不能等待所有任务都到达才能开始组)。

【问题讨论】:

您需要组和每个组的单独队列,以及主队列。如果一个任务被分组,检查它的组状态是否空闲/忙碌(+-销毁)(需要 CAS 或锁定)并入队到组队列,然后询问所述队列的处理。任务完成后,检查组队列并发布到主执行队列,如果可以轮询任何任务。我有一个 java 类(ExecutorService impl)可以做到这一点。 让我提供一份我在研究期间收集的问题列表,以供未来访问者参考:1 - 2 - 3 - 4 + 继续 续:5 - 6 - 7 - 主要主题是有一种机制可以让某些任务组串行运行,或者在池中的某个线程上运行。 (如果您无法轻松阅读 URL,现在可以在右侧栏的“已链接”部分中找到问题标题) 【参考方案1】:

您将如何确保这些任务是有序的?

push task1
push task2
push task346
push task5

响应编辑:

push task1
push task27   **
push task3468   *
push task5
push task9

【讨论】:

那么如果任务 3 花费了很长时间并且另一个线程尝试执行任务 4 会发生什么? 没有task4。有task346。 我明白你的意思,但我无法预先访问所有这些内容。 嗯,我需要知道什么时候你知道你知道什么,以便推荐不同的方法。你知道task3第一次推送的时候会有一个依赖任务吗?如果 task4 发生在一段时间后,你怎么知道它依赖于 task3? 我知道 task3 稍后​​会有一个依赖任务。但是我等不及该任务的到来了。我知道 task4 依赖于 task3 因为客户是这么说的。它们是用“键”推动的:具有相同键的任务应该按顺序执行。【参考方案2】:

我认为你在混淆概念。当您想在线程之间分配一些工作时,线程池是可以的,但是如果您开始在线程之间混合依赖关系,那么这不是一个好主意。

我的建议是,不要将线程池用于这些任务。只需创建一个专用线程并保留必须由该线程单独处理的顺序项目的简单队列。然后,您可以在没有顺序要求时继续将任务推送到线程池,并在有时使用专用线程。

澄清:按照常识,串行任务队列应由单个线程执行,一个接一个地处理每个任务:)

【讨论】:

【参考方案3】:

线程池适用于任务的相对顺序无关紧要的情况,只要它们都完成了。特别是,它们必须并行完成。

如果您的任务必须按特定顺序完成,那么它们不适合并行,因此不适合使用线程池。

如果您想将这些串行任务移出主线程,那么带有任务队列的单个后台线程将适合这些任务。您可以继续将线程池用于适合并行的剩余任务。

是的,这意味着您必须根据它是有序任务还是“可能并行化”的任务来决定将任务提交到哪里,但这没什么大不了的。

如果您有必须序列化的组,但可以与其他任务并行运行,那么您有多种选择:

    为每个组创建一个任务,按顺序执行相关的组任务,并将此任务发布到线程池。 让组中的每个任务显式等待组中的前一个任务,并将它们发布到线程池。这要求您的线程池可以处理线程正在等待尚未计划的任务而不会死锁的情况。 为每个组设置一个专用线程,并将组任务发布到相应的消息队列中。

【讨论】:

问题是我不知道客户端将推送多少组任务。我也不知道这些组的大小和“正常”任务的数量。所以我不能只为每个组创建一个新线程。 仅仅因为你不知道你有多少个组并不意味着你不能每个组有一个线程 --- 当你有一个新的时候你就开始一个团体。如果您的线程池可以处理它,我会选择我的选项 2,因为它让池处理线程数。 @AnthonyWilliams - 可能还有另一个例子,其中有传入事件并且您必须生成一条消息,通过为它们独立检索数据,但您仍然需要在事件的到达顺序。如何解决? @100pipers 这就是期货的用途。您将任务传递给线程池并将 Future 给另一个线程,该线程将按顺序评估它们。【参考方案4】:

如果我正确理解了这个问题,jdk 执行器不具备此功能,但很容易自行推出。你基本上需要

工作线程池,每个线程都有一个专用队列 对您提供工作的那些队列进行一些抽象(参见ExecutorService) 某种算法可以为每件工作确定性地选择特定队列 然后,每件工作都会被提供到正确的队列,从而以正确的顺序进行处理

jdk 执行器的不同之处在于它们有 1 个队列和 n 个线程,但您需要 n 个队列和 m 个线程(其中 n 可能等于或不等于 m)

* 阅读后编辑每个任务都有一个键 *

详细一点

编写一些代码,将键转换为给定范围内的索引(int)(0-n,其中 n 是您想要的线程数),这可以像key.hashCode() % n 一样简单,也可以是一些将已知键值静态映射到线程或您想要的任何东西 在启动时 创建 n 个队列,将它们放入索引结构(数组、列表中) 启动 n 个线程,每个线程只是从队列中阻塞获取 当它收到一些工作时,它知道如何执行特定于该任务/事件的工作(如果您有异构事件,您显然可以将任务映射到操作) 将其存储在某个接受工作项的外观后面 当一个任务到达时,把它交给门面 门面根据键为任务找到正确的队列,并将其提供给该队列

将自动重启的工作线程添加到这个方案中很容易,你只需要工作线程向某个管理器注册以声明“我拥有这个队列”,然后围绕它进行一些内务处理 + 检测线程中的错误(这意味着它取消注册该队列的所有权,将队列返回到空闲队列池,这是启动新线程的触发器)

【讨论】:

我喜欢你的想法。然而,它变得很棘手,因为客户端可能同时发送“简单任务”和“带密钥的任务”。 什么是“简单任务”?你如何确定一个简单的任务应该执行哪个线程?它们是否需要与键控任务一起被串行处理?例如,您可以轻松地通过 AtomicLong.getAndIncrement() % poolSize 在线程上循环它们 简单任务是没有密钥的任务;他们不依赖任何人(其他人也不依赖他们)。我知道如何安排他们;将它们与键控混合是问题。 简单的任务随便编个key就行了,没意义没关系,只要任务到达你想要的线程就行了 这取决于你;一个简单的循环(例如,由 poolsize 修改的全局计数器)是最简单的,稍微复杂一点的是循环,它解释了键任务的存在(例如,将作业分配给当前不忙的线程带有密钥的任务),另一种选择是保持特定线程热以帮助更快地处理作业的算法。它非常灵活,无需以任何方式明确地将任务链接在一起(您让队列这样做)【参考方案5】:

选项 1 - 复杂的选项

由于您有连续的作业,您可以将这些作业收集在一个链中,并让作业本身在完成后重新提交到线程池。假设我们有一个工作列表:

 [Task1, ..., Task6]

就像你的例子一样。我们有一个顺序依赖,这样[Task3, Task4, Task6] 就是一个依赖链。我们现在做一个工作(Erlang 伪代码):

 Task4Job = fun() ->
               Task4(), % Exec the Task4 job
               push_job(Task6Job)
            end.
 Task3Job = fun() ->
               Task3(), % Execute the Task3 Job
               push_job(Task4Job)
            end.
 push_job(Task3Job).

也就是说,我们通过将Task3 作业包装到一个作业中来改变它,作为延续将队列中的下一个作业推送到线程池。与这里的通用延续传递风格有很强的相似之处,也见于Node.js 或Pythons Twisted 框架等系统。

概括地说,您创建了一个系统,您可以在其中定义可以defer 进一步工作并重新提交进一步工作的工作链。

选项 2 - 简单的一个

我们为什么还要分工呢?我的意思是,由于它们是顺序相关的,因此在同一个线程上执行所有这些不会比采用该链并将其分散到多个线程上更快或更慢。假设“足够”的工作负载,任何线程总是有工作要做,所以将工作捆绑在一起可能是最简单的:

  Task = fun() ->
            Task3(),
            Task4(), 
            Task6()  % Just build a new job, executing them in the order desired
         end,
  push_job(Task).

如果你有作为一等公民的函数,那么做这样的事情就相当容易了,这样你就可以随心所欲地用你的语言构建它们,就像你可以在任何函数式编程语言、Python、Ruby 块中一样 -等等。

我并不特别喜欢构建队列或延续堆栈的想法,就像在“选项 1”中那样,但我肯定会选择第二个选项。在 Erlang 中,我们甚至有一个名为 jobs 的程序由 Erlang Solutions 编写并作为开源发布。 jobs 旨在执行和负载调节此类作业执行。如果我要解决这个问题,我可能会将选项 2 与工作结合起来。

【讨论】:

我需要拆分工作,因为我没有预先安排好工作,也等不及所有工作都来了。 啊,您可能想将其添加到问题中。仅在进行过程中了解新任务确实很重要。【参考方案6】:

建议不使用线程池的答案就像硬编码任务依赖/执行顺序的知识。相反,我会创建一个CompositeTask 来管理两个任务之间的开始/结束依赖关系。通过在任务接口后面封装依赖,可以统一对待所有任务,并加入到池中。这隐藏了执行细节并允许更改任务依赖关系,而不会影响您是否使用线程池。

问题没有指定语言 - 我将使用 Java,我希望它对大多数人来说都是可读的。

class CompositeTask implements Task

    Task firstTask;
    Task secondTask;

    public void run() 
         firstTask.run();
         secondTask.run();
    

这会在同一个线程上按顺序执行任务。您可以将多个CompositeTasks 链接在一起,以根据需要创建一个包含尽可能多的顺序任务的序列。

这里的缺点是,这会在所有顺序执行的任务期间占用线程。在第一个任务和第二个任务之间,您可能希望执行其他任务。因此,与其直接执行第二个任务,不如让复合任务调度执行第二个任务:

class CompositeTask implements Runnable

    Task firstTask;
    Task secondTask;
    ExecutorService executor;

    public void run() 
         firstTask.run();
         executor.submit(secondTask);
    

这确保了第二个任务在第一个任务完成后才会运行,并且还允许池执行其他(可能更紧急)​​任务。请注意,第一个和第二个任务可能在不同的线程上执行,因此尽管它们不会同时执行,但任务使用的任何共享数据都必须对其他线程可见(例如,通过设置变量 volatile。)

这是一种简单但功能强大且灵活的方法,允许任务自己定义执行约束,而不是通过使用不同的线程池来完成。

【讨论】:

【参考方案7】:

要使用线程池执行您想要执行的操作,您可能需要创建某种调度程序。

类似的东西:

任务队列 -> 调度器 -> 队列 -> 线程池

调度程序在自己的线程中运行,跟踪作业之间的依赖关系。当一个作业准备好完成时,调度程序只是将它推入线程池的队列中。

线程池可能必须向调度程序发送信号以指示作业何时完成,以便调度程序可以根据该作业将作业放入队列中。

在您的情况下,依赖项可能存储在链表中。

假设您有以下依赖项: 3 -> 4 -> 6 -> 8

作业 3 正在线程池上运行,您仍然不知道作业 8 存在。

作业 3 结束。您从链表中删除 3,将作业 4 放在队列中的线程池中。

作业 8 到了。你把它放在链表的末尾。

唯一需要完全同步的结构是调度器前后的队列。

【讨论】:

【参考方案8】:

这是可以实现的,嗯,据我了解您的情况。基本上你需要做一些聪明的事情来协调你在主线程中的任务。您需要的 Java API 是 ExecutorCompletionService 和 Callable

首先,实现你的可调用任务:

public interface MyAsyncTask extends Callable<MyAsyncTask> 
  // tells if I am a normal or dependent task
  private boolean isDependent;

  public MyAsyncTask call() 
    // do your job here.
    return this;
  

然后在你的主线程中,使用 CompletionService 协调依赖的任务执行(即等待机制):

ExecutorCompletionService<MyAsyncTask> completionExecutor = new 
  ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) 
  if (task.isNormal()) 
    // if it is a normal task, submit it immediately.
    completionExecutor.submit(task);
   else 
    if (dependentFutureTask == null) 
      // submit the first dependent task, get a reference 
      // of this dependent task for later use.
      dependentFutureTask = completionExecutor.submit(task);
     else 
      // wait for last one completed, before submit a new one.
      dependentFutureTask.get();
      dependentFutureTask = completionExecutor.submit(task);
    
  

通过这样做,您使用单个执行器(线程池大小 5)执行正常和依赖任务,正常任务一提交立即执行,依赖任务一个接一个执行(在主线程中执行等待通过在提交新的依赖任务之前在 Future 上调用 get()),因此在任何时间点,您总是有许多正常任务和单个依赖任务(如果存在)在单个线程池中运行。

这只是一个开始,通过使用 ExecutorCompletionService、FutureTask 和 Semaphore,您可以实现更复杂的线程协调场景。

【讨论】:

+1 用于提及 ExecutorCompletionService。我会为每个任务类型添加队列,而不是 isDependent 布尔值,您的 Callable 可以引用任务的队列。 CompletionService 可以从该任务的队列中获取,如果那里有任何等待,请将其提交到线程池。【参考方案9】:

使用两个Active Objects。简而言之:活动对象模式由优先级队列和 1 个或多个工作线程组成,这些线程可以从队列中获取任务并对其进行处理。

因此,使用一个活动对象和一个工作线程:所有将要排队的任务都将按顺序处理。使用工作线程数大于 1 的第二个活动对象。在这种情况下,工作线程将以任何顺序从队列中获取和处理任务。

运气。

【讨论】:

【参考方案10】:

您有两种不同的任务。将它们混合在一个队列中感觉很奇怪。而不是一个队列有两个。为了简单起见,您甚至可以对两者都使用 ThreadPoolExecutor。对于串行任务,只需给它一个固定大小 1,对于可以并发执行的任务,给它更多。我不明白为什么这会很笨拙。保持简单和愚蠢。您有两个不同的任务,因此请相应地对待它们。

【讨论】:

【参考方案11】:

类似下面的内容将允许串行和并行任务排队,串行任务将一个接一个地执行,并行任务将按任意顺序执行,但并行执行。这使您能够在必要时序列化任务,也有并行任务,但在收到任务时执行此操作,即您无需预先了解整个序列,执行顺序是动态维护的。

internal class TaskQueue

    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    
        lock (_syncObj)
        
            _tasks.Enqueue(new QTask  IsParallel = isParallel, Task = task );
        

        ProcessTaskQueue();
    

    public int Count
    
        getlock (_syncObj)return _tasks.Count;
    

    private void ProcessTaskQueue()
    
        lock (_syncObj)
        
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            
        
    

    private void QueueUserWorkItem(QTask qTask)
    
        Action completionTask = () =>
        
            qTask.Task();

            OnTaskCompleted();
        ;

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    

    private void OnTaskCompleted()
    
        lock (_syncObj)
        
            if (--_runningTaskCount == 0)
            
                ProcessTaskQueue();
            
        
    

    private class QTask
    
        public Action Task  get; set; 
        public bool IsParallel  get; set; 
    

更新

要处理具有串行和并行任务混合的任务组,GroupedTaskQueue 可以为每个组管理一个TaskQueue。同样,您不需要预先了解组,它们都是在收到任务时动态管理的。

internal class GroupedTaskQueue

    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    
        Queue(_defaultGroup, isParallel, task);
    

    public void Queue(string group, bool isParallel, Action task)
    
        TaskQueue queue;

        lock (_syncObj)
        
            if (!_queues.TryGetValue(group, out queue))
            
                queue = new TaskQueue();

                _queues.Add(group, queue);
            
        

        Action completionTask = () =>
        
            task();

            OnTaskCompleted(group, queue);
        ;

        queue.Queue(isParallel, completionTask);
    

    private void OnTaskCompleted(string group, TaskQueue queue)
    
        lock (_syncObj)
        
            if (queue.Count == 0)
            
                _queues.Remove(group);
            
        
    

【讨论】:

【参考方案12】:

由于在启动依赖任务之前只需要等待单个任务完成,如果你能在第一个任务中调度依赖任务就可以轻松完成。所以在你的第二个例子中: 在任务 2 结束时,安排任务 7 和 在任务 3 结束时,为 4->6 和 6->8 安排任务 4,依此类推。

一开始,只安排任务 1,2,5,9...,其余的应该遵循。

一个更普遍的问题是,您必须等待多个任务才能启动相关任务。有效地处理这件事并非易事。

【讨论】:

【参考方案13】:

基本上,有许多待处理的任务。某些任务只有在一个或多个其他待处理任务完成执行后才能执行。

挂起的任务可以在依赖图中建模:

“task 1 -> task2”表示“task 2只有在task 1完成后才能执行”。箭头指向执行顺序的方向。 任务的入度(指向它的任务数)决定了该任务是否已准备好执行。如果入度为0,则可以执行。 有时一个任务必须等待多个任务完成,然后入度 >1。 如果一个任务不再需要等待其他任务完成(它的入度为零),它可以被提交到带有工作线程的线程池,或者带有等待被工作线程提取的任务的队列线。你知道提交的任务不会导致死锁,因为任务没有等待任何东西。作为优化,您可以使用优先级队列,例如依赖图中更多任务所依赖的任务将首先执行。这也不会引发死锁,因为线程池中的所有任务都可以执行。然而,它会引起饥饿。 如果一个任务完成执行,它可以从依赖图中移除,可能会降低其他任务的入度,这些任务可以依次提交到工作线程池中。

所以(至少)有一个线程用于添加/删除待处理的任务,并且有一个工作线程的线程池。

将任务添加到依赖图中时,必须检查:

任务在依赖图中是如何连接的:它必须等待什么任务完成,什么任务必须等待它完成?相应地在新任务之间建立联系。 一旦连接被绘制:新连接是否会导致依赖图中的任何循环?如果是这样,就会出现死锁情况。

性能

如果实际上很少可能并行执行,则此模式比顺序执行要慢,因为您需要额外的管理才能几乎顺序地执行所有操作。 如果在实践中可以同时执行许多任务,这种模式会很快。

假设

正如您可能已经在字里行间看到的那样,您必须设计任务,使它们不会干扰其他任务。此外,必须有一种方法来确定任务的优先级。任务优先级应该包括每个任务处理的数据。两个任务不能同时改变同一个对象;其中一项任务应优先于另一项,否则对对象执行的操作必须是线程安全的。

【讨论】:

【参考方案14】:

我认为在这种情况下可以有效地使用线程池。这个想法是为每组相关任务使用单独的strand 对象。您可以使用或不使用 strand 对象将任务添加到队列中。您将相同的 strand 对象与相关任务一起使用。您的调度程序检查下一个任务是否有 strand 以及此 strand 是否已锁定。如果不是 - 锁定此 strand 并运行此任务。如果strand 已被锁定 - 将此任务保留在队列中,直到下一次调度事件。任务完成后解锁其strand

结果你需要一个队列,你不需要任何额外的线程,没有复杂的组等。strand 对象可以很简单,有两个方法lockunlock

我经常遇到相同的设计问题,例如对于处理多个同时会话的异步网络服务器。当会话内的任务是依赖的(这会将会话内部任务映射到组内的从属任务)时,会话是独立的(这会将它们映射到您的独立任务和相关任务组)。使用所描述的方法,我完全避免了会话内的显式同步。每个会话都有自己的strand 对象。

更重要的是,我使用了这个想法的现有(伟大)实现:Boost Asio library(C++)。我只是使用了他们的术语strand。实现很优雅:我将异步任务包装到相应的strand 对象中,然后再安排它们。

【讨论】:

【参考方案15】:

有一个专门用于此目的的 java 框架,称为 dexecutor(免责声明:我是所有者)

DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();

    executor.addDependency("task1", "task2");
    executor.addDependency("task4", "task6");
    executor.addDependency("task6", "task8");

    executor.addIndependent("task3");
    executor.addIndependent("task5");
    executor.addIndependent("task7");

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);

task1、task3、task5、task7 并行运行(取决于线程池大小),一旦 task1 完成,task2 运行,一旦 task2 完成 task4 运行,一旦 task4 完成 task6 运行,最后一旦 task6 完成 task8 运行。

【讨论】:

【参考方案16】:

已经有很多答案了,很明显一个已经被接受了。但是为什么不使用延续呢?

如果你有一个已知的“串行”条件,那么当你用这个条件将第一个任务排入队列时,保持任务;并为进一步的任务调用 Task.ContinueWith()。

public class PoolsTasks

    private readonly object syncLock = new object();
    private Task serialTask = Task.CompletedTask;


    private bool isSerialTask(Action task) 
        // However you determine what is serial ...
        return true;
    

    public void RunMyTask(Action myTask) 
        if (isSerialTask(myTask)) 
            lock (syncLock)
                serialTask = serialTask.ContinueWith(_ => myTask());
         else
            Task.Run(myTask);
    

【讨论】:

【参考方案17】:

具有有序和无序执行方法的线程池:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class OrderedExecutor 
    private ExecutorService multiThreadExecutor;
    // for single Thread Executor
    private ThreadLocal<ExecutorService> threadLocal = new ThreadLocal<>();

    public OrderedExecutor(int nThreads) 
        this.multiThreadExecutor = Executors.newFixedThreadPool(nThreads);
    

    public void executeUnordered(Runnable task) 
        multiThreadExecutor.submit(task);
    

    public void executeOrdered(Runnable task) 
        multiThreadExecutor.submit(() -> 
            ExecutorService singleThreadExecutor = threadLocal.get();
            if (singleThreadExecutor == null) 
                singleThreadExecutor = Executors.newSingleThreadExecutor();
                threadLocal.set(singleThreadExecutor);
            
            singleThreadExecutor.submit(task);
        );
    

    public void clearThreadLocal() 
        threadLocal.remove();
    


填满所有队列后,应该清除 threadLocal。 唯一的缺点是每次方法都会创建singleThreadExecutor

executeOrdered(可运行任务)

在单独的线程中调用

【讨论】:

【参考方案18】:

通过使用ConcurrentExclusiveSchedulerPair 实例的ExclusiveScheduler 属性,并在每次启动任务时将其用作TaskScheduler,在ThreadPool 上连续执行任务非常简单:

var taskFactory = new TaskFactory(
    new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);

Task task1 = taskFactory.StartNew(() => DoSomething());
Task task2 = taskFactory.StartNew(() => DoSomethingElse());

DoSomething()DoSomethingElse 都将在ThreadPool 上运行,一个接一个。保证两个调用不会重叠,并且它们将按照最初安排的顺序被调用。

但是如果这些调用中的任何一个失败会发生什么?这就是问题所在:DoSomething()DoSomethingElse 引发的任何异常都将被困在各自的 Tasktask1task2)内。这意味着我们不能只是开始任务而忘记它们。我们有责任将任务存储在某个地方,并最终await 他们并处理他们的异常。这可能正是我们想要的。

但是,如果我们只想安排任务并“忘记”它们,并且在不太可能的情况下,它们中的任何一个都无法将异常作为未处理的异常传播并终止进程,该怎么办?这并不像听起来那么疯狂。有些任务可能对应用程序的生命周期至关重要,而且它们不太可能失败,而且很难设计一种手动观察异常的策略,以至于它们的异常升级为即时应用程序终止(在引发AppDomain.UnhandledException 事件)可能是可用选项中较小的邪恶。那么有可能做到这一点吗?是的,但令人惊讶的困难和棘手:

using System.Runtime.ExceptionServices;

var taskFactory = new TaskFactory(
    new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);

void RunOnThreadPoolExclusive(Action action)

    _ = taskFactory.StartNew(() =>
    
        try
        
            action();
        
        catch (Exception ex)
        
            var edi = ExceptionDispatchInfo.Capture(ex);
            ThreadPool.QueueUserWorkItem(_ => edi.Throw());
        
    );


RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());

action 在 try/catch 块中调用。如果失败,异常会在ExceptionDispatchInfo 实例中捕获,以保留其堆栈跟踪,然后在ThreadPool 上重新抛出。请注意,taskFactory.StartNew 仍然返回 Task,使用 discard (_) 将其丢弃,因为现在任务失败的可能性很小。但我们真的取得了任何进展吗?我们从DoSomething 不太可能失败的前提开始,我们最终放弃了我们评估为极不可能失败的Task。确实不是很满意!我们能做得更好吗?是的!进入async void的infamous世界:

var taskFactory = new TaskFactory(
    new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);

async void RunOnThreadPoolExclusive(Action action)

    await taskFactory.StartNew(action);


RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());

async void 方法有一个有趣的特性,即在它们内部抛出的任何异常都会在 SynchronizationContext 上引发,该异常是在 async void 方法启动时捕获的,或者(作为后备)在 ThreadPool 上引发。因此,例如,如果在 WinForms 应用程序的 UI 线程上调用 RunOnThreadPoolExclusive,并且操作失败,则会弹出一个消息框,询问用户是否要继续或退出应用程序 (screenshot)。所以错误不一定是致命的,因为用户可能会选择忽略错误并继续。这可能正是我们想要的。也可能不会。

为了澄清,错误将在 UI 线程上抛出,但 DoSomething()/DoSomethingElse() 仍将在 ThreadPool 上调用。这没有改变。

那么,我们究竟如何确保错误将在ThreadPool 上抛出,而其他任何地方,无论如何,不​​管当前上下文如何,并且不允许任何 任务触发-然后忘记?方法如下:

var taskFactory = new TaskFactory(
    new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);

void RunOnThreadPoolExclusive(Action action)

    Task task = taskFactory.StartNew(action);
    ThreadPool.QueueUserWorkItem(async state => await (Task)state, task);


RunOnThreadPoolExclusive(() => DoSomething());
RunOnThreadPoolExclusive(() => DoSomethingElse());

ThreadPool 上以正确的顺序序列化执行,在ThreadPool 上抛出错误,并且没有泄露的即发即弃任务。完美!

【讨论】:

以上是关于确保线程池中的任务执行顺序的主要内容,如果未能解决你的问题,请参考以下文章

Java如何判断线程池所有任务是不是执行完毕

Java如何判断线程池所有任务是不是执行完毕

Java如何判断线程池所有任务是不是执行完毕

使用 ExecutorService 控制任务执行顺序

如何确保三个线程顺序执行

JAVA 线程池 其中一个线程执行失败 则线程重新执行或者重新提交任务 急