如何同步三个相互依赖的任务的循环执行?

Posted

技术标签:

【中文标题】如何同步三个相互依赖的任务的循环执行?【英文标题】:How to synchronize the recurrent execution of three tasks that depend on each other? 【发布时间】:2021-11-12 01:21:44 【问题描述】:

我想请教 C# 方面的专家开发人员。我的程序需要执行三项经常性任务。任务 2 依赖于任务 1,任务 3 依赖于任务 2,但任务 1 无需等待其他两个任务完成即可重新启动(程序一直在运行)。由于每个任务都需要一些时间,我想在一个线程或 C#Task 中运行每个任务。任务 1 完成后,任务 2 启动,任务 1 再次启动……等等。

我不确定实现这一点的最佳方法是什么。我希望有人可以指导我。

【问题讨论】:

你可以结帐等待异步 在 SO 上有很多与 c# 多线程概念相关的好问答,其中一个与您的 Q 相关的问答有一个 here 【参考方案1】:

实现此目的的一种方法是使用称为Task Parallel Library 的东西。这提供了一组类,允许您将任务安排到“块”中。您创建一个按顺序执行 A、B 和 C 的方法,然后 TPL 将负责同时运行该方法的多个调用。这是一个小例子:

async Task Main()

    var actionBlock = new ActionBlock<int>(DoTasksAsync, new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = 2 // This is the number of simultaneous executions of DoTasksAsync that will be run
    ;
    
    await actionBlock.SendAsync(1);
    await actionBlock.SendAsync(2);
    
    actionBlock.Complete();
    await actionBlock.Completion;


async Task DoTasksAsync(int input)

    await DoTaskAAsync();
    await DoTaskBAsync();
    await DoTaskCAsync();

【讨论】:

【参考方案2】:

我可能会使用某种队列模式。

我不确定任务 1 是否是线程安全的要求,所以我会保持简单:

任务 1 始终在执行。完成后,它会在某个队列上发布一条消息并重新开始。 任务 2 正在侦听队列。只要有消息可用,它就会开始处理它。 只要任务 2 完成工作,它就会调用任务 3,以便它可以完成它的工作。

作为提到的 cmets 之一,您应该能够在代码中成功使用 async/await。尤其是在任务 2 和 3 之间。请注意,任务 1 可以与任务 2 和 3 并行运行,因为它不依赖于任何其他任务。

【讨论】:

【参考方案3】:

您可以使用下面的ParallelLoop 方法。此方法启动一个异步工作流,其中三个任务彼此并行调用,但它们本身按顺序调用。因此,您无需在每个任务中添加同步,除非某些任务会产生对其他任务可见的全局副作用。

使用Task.Run 方法在ThreadPool 上调用任务。

/// <summary>
/// Invokes three actions repeatedly in parallel on the ThreadPool, with the
/// action2 depending on the action1, and the action3 depending on the action2.
/// Each action is invoked sequentially to itself.
/// </summary>
public static async Task ParallelLoop<TResult1, TResult2>(
    Func<TResult1> action1,
    Func<TResult1, TResult2> action2,
    Action<TResult2> action3,
    CancellationToken cancellationToken = default)

    // Arguments validation omitted
    var task1 = Task.FromResult<TResult1>(default);
    var task2 = Task.FromResult<TResult2>(default);
    var task3 = Task.CompletedTask;
    try
    
        int counter = 0;
        while (true)
        
            counter++;

            var result1 = await task1.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
            task1 = Task.Run(action1); // Restart the task1
            if (counter <= 1) continue; // In the first loop result1 is undefined

            var result2 = await task2.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
            task2 = Task.Run(() => action2(result1)); // Restart the task2
            if (counter <= 2) continue; // In the second loop result2 is undefined

            await task3.ConfigureAwait(false);
            cancellationToken.ThrowIfCancellationRequested();
            task3 = Task.Run(() => action3(result2)); // Restart the task3
        
    
    finally
    
        // Prevent fire-and-forget
        Task allTasks = Task.WhenAll(task1, task2, task3);
        try  await allTasks.ConfigureAwait(false);  catch  allTasks.Wait(); 
        // Propagate all errors in an AggregateException
    

在实现中有一个明显的模式,这使得添加具有三个以上动作的重载变得微不足道。每个添加的操作都需要自己的泛型类型参数(TResult3TResult4 等)。

使用示例:

var cts = new CancellationTokenSource();
Task loopTask = ParallelLoop(() =>

    // First task
    Thread.Sleep(1000); // Simulates synchronous work
    return "OK"; // The result that is passed to the second task
, result =>

    // Second task
    Thread.Sleep(1000); // Simulates synchronous work
    return result + "!"; // The result that is passed to the third task
, result =>

    // Third task
    Thread.Sleep(1000); // Simulates synchronous work
, cts.Token);

如果任何任务失败,整个循环将停止(loopTask.Exception 包含错误)。由于任务相互依赖,不可能从单个失败的任务中恢复¹。您可以做的是通过 Polly Retry 策略执行整个循环,以确保在失败的情况下循环将被重生。如果您对Polly library 不熟悉,可以使用下面简单且无特色的RetryUntilCanceled 方法:

public static async Task RetryUntilCanceled(Func<Task> action,
    CancellationToken cancellationToken)

    while (true)
    
        cancellationToken.ThrowIfCancellationRequested();
        try  await action().ConfigureAwait(false); 
        catch  if (cancellationToken.IsCancellationRequested) throw; 
    

用法:

Task loopTask = RetryUntilCanceled(() => ParallelLoop(() =>

   //...
, cts.Token), cts.Token);

在退出进程之前,建议您Cancel()CancellationTokenSourceWait()(或awaitloopTask,以便循环正常终止。否则,某些任务可能会在工作过程中中止。

¹ 通过 Polly Retry 策略执行每个单独的任务实际上是可能的,并且可能更可取。并行循环将暂停,直到成功重试失败的任务。

【讨论】:

注意:取消cancellationToken 会取消并行循环,而所有操作都没有执行相同的次数。 action1action2 多执行一次,action2action3 多执行一次。 我已经在this GitHub 存储库上上传了上述想法的完善实现。

以上是关于如何同步三个相互依赖的任务的循环执行?的主要内容,如果未能解决你的问题,请参考以下文章

0182 JavaScript执行机制:单线程,同步任务和异步任务,执行栈,消息队列,事件循环

js事件循环运行机制

js事件循环运行机制

事件循环(event loop)

浏览器的线程

异步任务