如何同步三个相互依赖的任务的循环执行?
Posted
技术标签:
【中文标题】如何同步三个相互依赖的任务的循环执行?【英文标题】:How to synchronize the recurrent execution of three tasks that depend on each other? 【发布时间】:2021-11-12 01:21:44 【问题描述】:我想请教 C# 方面的专家开发人员。我的程序需要执行三项经常性任务。任务 2 依赖于任务 1,任务 3 依赖于任务 2,但任务 1 无需等待其他两个任务完成即可重新启动(程序一直在运行)。由于每个任务都需要一些时间,我想在一个线程或 C#Task
中运行每个任务。任务 1 完成后,任务 2 启动,任务 1 再次启动……等等。
我不确定实现这一点的最佳方法是什么。我希望有人可以指导我。
【问题讨论】:
你可以结帐等待异步 在 SO 上有很多与 c# 多线程概念相关的好问答,其中一个与您的 Q 相关的问答有一个 here 【参考方案1】:实现此目的的一种方法是使用称为Task Parallel Library 的东西。这提供了一组类,允许您将任务安排到“块”中。您创建一个按顺序执行 A、B 和 C 的方法,然后 TPL 将负责同时运行该方法的多个调用。这是一个小例子:
async Task Main()
var actionBlock = new ActionBlock<int>(DoTasksAsync, new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 2 // This is the number of simultaneous executions of DoTasksAsync that will be run
;
await actionBlock.SendAsync(1);
await actionBlock.SendAsync(2);
actionBlock.Complete();
await actionBlock.Completion;
async Task DoTasksAsync(int input)
await DoTaskAAsync();
await DoTaskBAsync();
await DoTaskCAsync();
【讨论】:
【参考方案2】:我可能会使用某种队列模式。
我不确定任务 1 是否是线程安全的要求,所以我会保持简单:
任务 1 始终在执行。完成后,它会在某个队列上发布一条消息并重新开始。 任务 2 正在侦听队列。只要有消息可用,它就会开始处理它。 只要任务 2 完成工作,它就会调用任务 3,以便它可以完成它的工作。作为提到的 cmets 之一,您应该能够在代码中成功使用 async/await。尤其是在任务 2 和 3 之间。请注意,任务 1 可以与任务 2 和 3 并行运行,因为它不依赖于任何其他任务。
【讨论】:
【参考方案3】:您可以使用下面的ParallelLoop
方法。此方法启动一个异步工作流,其中三个任务彼此并行调用,但它们本身按顺序调用。因此,您无需在每个任务中添加同步,除非某些任务会产生对其他任务可见的全局副作用。
使用Task.Run
方法在ThreadPool
上调用任务。
/// <summary>
/// Invokes three actions repeatedly in parallel on the ThreadPool, with the
/// action2 depending on the action1, and the action3 depending on the action2.
/// Each action is invoked sequentially to itself.
/// </summary>
public static async Task ParallelLoop<TResult1, TResult2>(
Func<TResult1> action1,
Func<TResult1, TResult2> action2,
Action<TResult2> action3,
CancellationToken cancellationToken = default)
// Arguments validation omitted
var task1 = Task.FromResult<TResult1>(default);
var task2 = Task.FromResult<TResult2>(default);
var task3 = Task.CompletedTask;
try
int counter = 0;
while (true)
counter++;
var result1 = await task1.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
task1 = Task.Run(action1); // Restart the task1
if (counter <= 1) continue; // In the first loop result1 is undefined
var result2 = await task2.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
task2 = Task.Run(() => action2(result1)); // Restart the task2
if (counter <= 2) continue; // In the second loop result2 is undefined
await task3.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
task3 = Task.Run(() => action3(result2)); // Restart the task3
finally
// Prevent fire-and-forget
Task allTasks = Task.WhenAll(task1, task2, task3);
try await allTasks.ConfigureAwait(false); catch allTasks.Wait();
// Propagate all errors in an AggregateException
在实现中有一个明显的模式,这使得添加具有三个以上动作的重载变得微不足道。每个添加的操作都需要自己的泛型类型参数(TResult3
、TResult4
等)。
使用示例:
var cts = new CancellationTokenSource();
Task loopTask = ParallelLoop(() =>
// First task
Thread.Sleep(1000); // Simulates synchronous work
return "OK"; // The result that is passed to the second task
, result =>
// Second task
Thread.Sleep(1000); // Simulates synchronous work
return result + "!"; // The result that is passed to the third task
, result =>
// Third task
Thread.Sleep(1000); // Simulates synchronous work
, cts.Token);
如果任何任务失败,整个循环将停止(loopTask.Exception
包含错误)。由于任务相互依赖,不可能从单个失败的任务中恢复¹。您可以做的是通过 Polly Retry
策略执行整个循环,以确保在失败的情况下循环将被重生。如果您对Polly library 不熟悉,可以使用下面简单且无特色的RetryUntilCanceled
方法:
public static async Task RetryUntilCanceled(Func<Task> action,
CancellationToken cancellationToken)
while (true)
cancellationToken.ThrowIfCancellationRequested();
try await action().ConfigureAwait(false);
catch if (cancellationToken.IsCancellationRequested) throw;
用法:
Task loopTask = RetryUntilCanceled(() => ParallelLoop(() =>
//...
, cts.Token), cts.Token);
在退出进程之前,建议您Cancel()
CancellationTokenSource
和Wait()
(或await
)loopTask
,以便循环正常终止。否则,某些任务可能会在工作过程中中止。
¹ 通过 Polly Retry
策略执行每个单独的任务实际上是可能的,并且可能更可取。并行循环将暂停,直到成功重试失败的任务。
【讨论】:
注意:取消cancellationToken
会取消并行循环,而所有操作都没有执行相同的次数。 action1
比action2
多执行一次,action2
比action3
多执行一次。
我已经在this GitHub 存储库上上传了上述想法的完善实现。以上是关于如何同步三个相互依赖的任务的循环执行?的主要内容,如果未能解决你的问题,请参考以下文章