c# Task.WhenAll(tasks) 和 SemaphoreSlim - 如何知道所有任务何时已完全完成

Posted

技术标签:

【中文标题】c# Task.WhenAll(tasks) 和 SemaphoreSlim - 如何知道所有任务何时已完全完成【英文标题】:c# Task.WhenAll(tasks) and SemaphoreSlim - how to know when all tasks have fully completed 【发布时间】:2014-11-03 16:19:40 【问题描述】:

我遇到了 C# 中的任务/线程管理问题,我想知道是否有一个简单的解决方案可以解决我的问题。

在我的 Windows Phone 应用程序中,我创建了一堆“上传”任务。每个任务都有一个进度/完成处理程序,我在其中检查任务的状态,如果成功完成,我需要执行一些线程安全的文件写入操作。

伪代码:

这个 sn-p 来自设置我的任务并启动它们运行的​​方法。我希望此方法仅在所有任务完全完成后返回给调用者:

var progressCallback = new Progress<UploadOperation>(UploadProgress);
for (var i = 0; i < uploads.Count; i++)

    uploadTasks[i] = uploads[i].StartAsync().AsTask(ct, progressCallback);

await Task.WhenAll(uploadTasks);
// all uploads complete!
return;

我的进度/任务完成处理程序检查状态,如果正常,我会引发一个事件,触发对需要执行一些文件写入的线程安全方法的调用:

private void UploadProgress(UploadOperation upload)
  ....
  if(upload == successful)
  
      //raise an event which results in a call to a thread-safe method
      //to perform post-upload tasks          
  
  ....

这是我的线程安全方法(由上述事件触发),其中我使用 SemaphoreSlim 对象来确保一次只有一个线程可以访问它:

private static readonly SemaphoreSlim _SemaphoreSlim = new SemaphoreSlim(1);
private async void OnItemUploadOperationCompleted(...)

    await _SemaphoreSlim.WaitAsync();
    try
    
       //perform some await-able file updates / writes
    
    catch(..)

    finally
    
        //release lock
        _SemaphoreSlim.Release();
    

我遇到的困难是主要的“任务设置”方法在所有上传任务完成线程安全方法之前返回并退出,即我们在几个任务仍然存在时点击了下面的返回语句还没有轮到他们使用 OnItemUploadOperationCompleted 方法。

await Task.WhenAll(uploadTasks);
// all uploads complete!
return;

我正在尝试确定是否有更好的方法来做到这一点。有没有办法确定所有任务都“完全”完成,它们还没有挂起并在队列中等待进入线程安全操作?基本上我需要知道所有处理何时完成,包括每个任务的所有线程安全后处理。

似乎“Task.WhenAll”返回得太早了,也许是在初始任务本身完成时而不是在它的子任务/衍生任务完成时?

编辑:

我按照Servy的建议(下面第一个回答),如下:

foreach (var upload in uploads)

    uploadTasks.Add(upload.StartAsync().AsTask(ct, progressCallback).ContinueWith(task => ProcessUploadResult(task.Result), ct));


await Task.WhenAll(uploadTasks);

// all uploads complete?
return;

我的 ProcessUploadResult 方法如下:

private void ProcessUploadResult(UploadOperation uploadResult)

 ....
 //pseudo code
 if(uploadResult == success)

     //RAISE an Event!
     //The listener of this event processes the result - 
     // - performs some file writes / updates
     // - therefore the handler for this eventy MUST be thread safe.
     OnItemUploadOperationCompleted(this, ...);

 

所以,即使使用这种方法,我的困难是事件处理程序在“Task.WhenAll(...)”返回时仍未完成处理所有上传。仍有线程等待访问该事件处理程序。

所以,我想我已经找到了一个解决方案,我想知道它是否是一个好的解决方案,使用 ManualResetEvent:

 ....
 //pseudo code
 if(uploadResult == success)

     //RAISE an Event!
     //The listener of this event processes the result - 
     // - performs some file writes / updates
     // - therefore the handler for this eventy MUST be thread safe.


     var wait = new ManualResetEvent(false);

     // pass the reference to ManualResetEvent in the event Args
     OnItemUploadOperationCompleted(this, new MyEventArgs waiter = wait);

     wait.WaitOne();

 

现在在我的处理程序中,我使用 ManualResetEvent 对象向等待线程发出信号,表明我们已完成对上传响应的处理:

private async void OnItemUploadOperationCompleted(object sender, UploadResultEventArgs e)

        await _SemaphoreSlim.WaitAsync();
        try
        
             //perform async file writes
        
        catch
               ....
        
        finally
        
            //release lock
            _SemaphoreSlim.Release();

            //signal we are done here
            var waiter = e.Waiter as ManualResetEvent;
            if (waiter != null)
            
                waiter.Set();
            
        

这似乎终于对我有用了。我想知道它是否是一个理想的解决方案?

【问题讨论】:

【参考方案1】:

Progress 类用于使用操作的当前进度更新 UI,并且该操作不应该关心这些更新是什么或何时完成。

你在这里拥有的是一个延续;某项任务完成后需要完成的一些工作,以根据前一项任务的结果做额外的工作。您应该为此使用ContinueWith 方法。 (或async 方法,因为这将被转换为延续。)

考虑到你所拥有的一切,这实际上非常简单:

uploadTasks[i] = uploads[i].StartAsync()
    .AsTask(ct, progressCallback)
    .ContinueWith(task => ProcessResult(task.Result));

然后,您的 ProcessResult 方法可以处理这些结果,就像您在触发 Progress 实例时所做的那样。

【讨论】:

谢谢,我想这可能正是我正在寻找的解决方案! 出于上下文和异常原因,我建议使用await 而不是ContinueWith+Result。最干净的解决方案可能是创建一个单独的 async 方法来执行上传及其后处理。 @user1857360 那么您遇到的问题与您在原始问题中提出的问题完全不同。您需要找到一种方法来获取代表您的事件何时触发的任务,您可以使用TaskCompletionSource 来做到这一点。你不应该使用ManualResetEvent,因为它会同步阻塞,而不是异步阻塞。 @Servy 谢谢。在我最初的问题中,我确实试图解释这个问题,即当我的主调用者返回时,EventHandler 尚未完成所有上传线程 - 因此我需要以某种方式阻塞,直到 EventHandler 完成执行。这就是我尝试使用 ManualResetEvent 的原因,它实际上似乎对我有用。您能否澄清一下 ManualResetEvent 和 TaskCompletionSource 之间的区别?如果我理解正确,在我的 EventHandler 中,我需要在 TaskCompletionSource 对象上调用 SetResult(..),以表明处理程序“完成”? @user1857360 TCS 的文档对此有何评论?

以上是关于c# Task.WhenAll(tasks) 和 SemaphoreSlim - 如何知道所有任务何时已完全完成的主要内容,如果未能解决你的问题,请参考以下文章

Parallel.ForEach 与 Task.Run 和 Task.WhenAll

Task CancellationTokenSource和Task.WhenAll的应用

csharp Task.WhenAllを利用している场合のタイムアウト处理の书き方(Task.WhenAll,Task.WhenAny,Task.Delay)

Task.WhenAll(taskList).Wait() 是不是与 Task.WaitAll(taskList) 相同?

异步/等待死锁 Task.WaitAll 与 Task.WhenAll [重复]

从 Task.WhenAll 调用的异步方法使用 DbContext 并返回错误