将 ManualResetEvent 包装为等待任务

Posted

技术标签:

【中文标题】将 ManualResetEvent 包装为等待任务【英文标题】:Wrapping ManualResetEvent as awaitable task 【发布时间】:2013-09-16 08:56:53 【问题描述】:

我想等待手动重置事件超时并观察取消。我想出了类似下面的东西。手动重置事件对象由我无法控制的 API 提供。有没有办法在不占用和阻止来自 ThreadPool 的线程的情况下实现这一点?

static Task<bool> TaskFromWaitHandle(WaitHandle mre, int timeout, CancellationToken ct)

    return Task.Run(() =>
    
        bool s = WaitHandle.WaitAny(new WaitHandle[]  mre, ct.WaitHandle , timeout) == 0;
        ct.ThrowIfCancellationRequested();
        return s;
    , ct);


// ...

if (await TaskFromWaitHandle(manualResetEvent, 1000, cts.Token))

    // true if event was set

else 

    // false if timed out, exception if cancelled 

[EDITED] 显然,makes sense 使用 RegisterWaitForSingleObject。我试试看。

【问题讨论】:

见blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx 感谢@JonSkeet,在我的情况下,它必须是一个真正的手动重置事件,超出我的控制范围。我会更新问题以反映这一点。 在这种情况下,我完全希望答案是“否”——将同步 API 包装在异步 API 中通常确实涉及阻塞线程。 我在考虑类似ThreadPool.RegisterWaitForSingleObject 甚至非托管RegisterWaitForSingleObject 之类的东西。我意识到从技术上讲,仍然有一个内核池线程在等待,但理论上它可能有一些用于单个等待句柄的聚合逻辑(例如,使用 WaitForMultipleObjects),因此一个线程可能会服务多个 RegisterWaitForSingleObject 请求。这比上面的一个池线程每个等待句柄要好。 这只是一个事件,不,RWFSO 不会优化它。它使用一个专用的等待线程,它可以处理许多 RWFSO 调用。 【参考方案1】:

RegisterWaitForSingleObject 将把等待合并到专用的等待线程上,每个线程都可以等待多个句柄(具体来说,63 个,即MAXIMUM_WAIT_OBJECTS 减去一个“控制”句柄)。

所以你应该可以使用这样的东西(警告:未经测试):

public static class WaitHandleExtensions

    public static Task AsTask(this WaitHandle handle)
    
        return AsTask(handle, Timeout.InfiniteTimeSpan);
    

    public static Task AsTask(this WaitHandle handle, TimeSpan timeout)
    
        var tcs = new TaskCompletionSource<object>();
        var registration = ThreadPool.RegisterWaitForSingleObject(handle, (state, timedOut) =>
        
            var localTcs = (TaskCompletionSource<object>)state;
            if (timedOut)
                localTcs.TrySetCanceled();
            else
                localTcs.TrySetResult(null);
        , tcs, timeout, executeOnlyOnce: true);
        tcs.Task.ContinueWith((_, state) => ((RegisteredWaitHandle)state).Unregister(null), registration, TaskScheduler.Default);
        return tcs.Task;
    

【讨论】:

谢谢@StephenCleary。这里有充分的理由将tcs 传递为state 吗?总体而言,您认为这是比使用 WaitHandle.WaitAny 更受欢迎的解决方案吗? 我正在使用RegisterWaitForSingleObjectContinueWith 上的状态参数作为优化。这比WaitAny 更受欢迎,因为此等待可以与其他等待结合使用。 @StephenCleary 但为什么类型不安全? TaskCompletionSource&lt;RegisteredWaitHandle&gt;.Task 仍会隐式转换为 Task 就好了。或者,如果这不是一个选项,您可以通过使用自动关闭来获得类型安全 ;-)。 @ChrisMoschini:如果你不能保证Task 变量会在请求到达之前被设置,你可以使用TaskCompletionSource&lt;T&gt; @JotaBe:这些天我有一个more modern approach。这两种方法都假设句柄最终会收到信号(或超时)。如果不是,则任务将无法完成。因此,如果您有一个循环或在 *Any 方法中使用它的东西,您的代码可能每次都开始一个 new 等待,而不是使用现有的。【参考方案2】:

您也可以使用类似于 ManualResetEvent 的 SemaphoreSlim.WaitAsync()

【讨论】:

问题是没有“等待不减量”。有时您只想等待而不是递减计数器。 SemaphoreSlim 类似于AutoResetEvent,而不是ManualResetEvent 另一个问题是调用 Reset() 可能会通过 SemaphoreFullException 如果没有任何东西可以释放。即使在调用 Release() 之前检查 CurrentCount,我也会遇到多个线程通过该检查并尝试释放信号量的情况。我试图避免任何异常的可能性,因为它们会损害性能。【参考方案3】:

您可以试一试https://www.badflyer.com/asyncmanualresetevent,尝试在https://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx 上的示例的基础上支持超时和取消。

using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// An async manual reset event.
/// </summary>
public sealed class ManualResetEventAsync

    // Inspiration from https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
    // and the .net implementation of SemaphoreSlim

    /// <summary>
    ///  The timeout in milliseconds to wait indefinitly.
    /// </summary>
    private const int WaitIndefinitly = -1;

    /// <summary>
    /// True to run synchronous continuations on the thread which invoked Set. False to run them in the threadpool.
    /// </summary>
    private readonly bool runSynchronousContinuationsOnSetThread = true;

    /// <summary>
    /// The current task completion source.
    /// </summary>
    private volatile TaskCompletionSource<bool> completionSource = new TaskCompletionSource<bool>();

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    public ManualResetEventAsync(bool isSet)
        : this(isSet: isSet, runSynchronousContinuationsOnSetThread: true)
    
    

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    /// <param name="runSynchronousContinuationsOnSetThread">If you have synchronous continuations, they will run on the thread which invokes Set, unless you set this to false.</param>
    public ManualResetEventAsync(bool isSet, bool runSynchronousContinuationsOnSetThread)
    
        this.runSynchronousContinuationsOnSetThread = runSynchronousContinuationsOnSetThread;

        if (isSet)
        
            this.completionSource.TrySetResult(true);
        
    

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <returns>A task which completes when the event is set.</returns>
    public Task WaitAsync()
    
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, default(CancellationToken));
    

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event.</returns>
    public Task WaitAsync(CancellationToken token)
    
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, token);
    

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout, CancellationToken token)
    
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, token);
    

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout)
    
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, default(CancellationToken));
    

    /// <summary>
    /// Set the completion source.
    /// </summary>
    public void Set()
    
        if (this.runSynchronousContinuationsOnSetThread)
        
            this.completionSource.TrySetResult(true);
        
        else
        
            // Run synchronous completions in the thread pool.
            Task.Run(() => this.completionSource.TrySetResult(true));
        
    

    /// <summary>
    /// Reset the manual reset event.
    /// </summary>
    public void Reset()
    
        // Grab a reference to the current completion source.
        var currentCompletionSource = this.completionSource;

        // Check if there is nothing to be done, return.
        if (!currentCompletionSource.Task.IsCompleted)
        
            return;
        

        // Otherwise, try to replace it with a new completion source (if it is the same as the reference we took before).
        Interlocked.CompareExchange(ref this.completionSource, new TaskCompletionSource<bool>(), currentCompletionSource);
    

    /// <summary>
    /// Await completion based on a timeout and a cancellation token.
    /// </summary>
    /// <param name="timeoutMS">The timeout in milliseconds.</param>
    /// <param name="token">The cancellation token.</param>
    /// <returns>A task (true if wait succeeded). (False on timeout).</returns>
    private async Task<bool> AwaitCompletion(int timeoutMS, CancellationToken token)
    
        // Validate arguments.
        if (timeoutMS < -1 || timeoutMS > int.MaxValue)
        
            throw new ArgumentException("The timeout must be either -1ms (indefinitely) or a positive ms value <= int.MaxValue");
        

        CancellationTokenSource timeoutToken = null;

        // If the token cannot be cancelled, then we dont need to create any sort of linked token source.
        if (false == token.CanBeCanceled)
        
            // If the wait is indefinite, then we don't need to create a second task at all to wait on, just wait for set. 
            if (timeoutMS == -1)
            
                return await this.completionSource.Task;
            

            timeoutToken = new CancellationTokenSource();
        
        else
        
            // A token source which will get canceled either when we cancel it, or when the linked token source is canceled.
            timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(token);
        

        using (timeoutToken)
        
            // Create a task to account for our timeout. The continuation just eats the task cancelled exception, but makes sure to observe it.
            Task delayTask = Task.Delay(timeoutMS, timeoutToken.Token).ContinueWith((result) =>  var e = result.Exception; , TaskContinuationOptions.ExecuteSynchronously);

            var resultingTask = await Task.WhenAny(this.completionSource.Task, delayTask).ConfigureAwait(false);

            // The actual task finished, not the timeout, so we can cancel our cancellation token and return true.
            if (resultingTask != delayTask)
            
                // Cancel the timeout token to cancel the delay if it is still going.
                timeoutToken.Cancel();
                return true;
            

            // Otherwise, the delay task finished. So throw if it finished because it was canceled.
            token.ThrowIfCancellationRequested();
            return false;
        
    

【讨论】:

【参考方案4】:

替代方案:等待任务的句柄和手动重置事件

当使用Task.WaitAny()Task(由SqlConnection.OpenAsync()' 返回)和作为参数接收并包装在TaskAsTask() 中的手动重置事件时,我遇到了内存泄漏。这些对象没有被释放:TaskCompletionSource&lt;Object&gt;, Task&lt;Object&gt;, StandardTaskContinuation, RegisteredWaitHandle, RegisteredWaithandleSafe, ContinuationResultTaskFromresultTask&lt;Object,bool&gt;, _ThreadPoolWaitOrTimerCallback)。

这是在 Windows 服务中使用的真实生产代码,该函数尝试在循环中打开与数据库的连接,直到连接打开,或操作失败,或作为参数接收到 ManualResetEvent _finishRequest在包含此代码的函数中,由任何其他线程中的代码发出信号。

为了避免泄漏,我决定反其道而行之:等待_finishRequestTask 返回的句柄OpenAsync()

Task asyncOpening = sqlConnection.OpenAsync();

// Wait for the async open to finish, or until _finishRequest is signaled
var waitHandles = new WaitHandle[]

  // index 0 in array: extract the AsyncWaitHandle from the Task
  ((IAsyncResult)asyncOpening).AsyncWaitHandle,
  // index 1:
  _finishRequest
;

// Check if finish was requested (index of signaled handle in the array = 1)
int whichFinished = WaitHandle.WaitAny(waitHandles);
finishRequested = whichFinished == 1;
// If so, break the loop to exit immediately
if (finishRequested)
  break;
                    
// If not, check if OpenAsync finished with error (it's a Task)
if (asyncOpening.IsFaulted)

  // Extract the exception from the task, and throw it
  // NOTE: adapt it to your case. In mine, I'm interested in the inner exception,
  // but you can check the exception itself, for example to see if it was a timeout,
  // if you specified it in the call to the async function that returns the Task
  var ex = asyncOpening?.Exception?.InnerExceptions?[0];
  if (ex != null) throw ex; 

else

  Log.Verbose("Connection to database Database on server Server", database, server);
  break;

如果你也需要超时,你可以在调用OpenAsync,或者你的异步函数中包含它,然后检查异步操作的结果是否因为超时而被取消:检查Task的状态完成后,您可以在代码注释中的 NOTE 中看到。

【讨论】:

【参考方案5】:

Stephen 的 Cleary 解决方案看起来很完美。微软提供similar one。

因为我还没有看到取消逻辑的示例。

这里是:

public static class WaitHandleExtensions

    public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
    
        if (waitHandle == null)
            throw new ArgumentNullException(nameof(waitHandle));

        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
        TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;

        RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
            (_, timedOut) =>
            
                if (timedOut)
                
                    tcs.TrySetCanceled();
                
                else
                
                    tcs.TrySetResult(true);
                
            , 
            null, timeout, true);

        Task<bool> task = tcs.Task;

        _ = task.ContinueWith(_ =>
        
            rwh.Unregister(null);
            return ctr.Unregister();
        , CancellationToken.None);

        return task;
    

【讨论】:

以上是关于将 ManualResetEvent 包装为等待任务的主要内容,如果未能解决你的问题,请参考以下文章

C#中ManualResetEvent用法简介

多线程ManualResetEvent等待所有线程

netframework中等待多个子线程执行完毕并计算执行时间

信号量

Thread.Yield 与 WaitOne

ManualResetEvent实现线程的暂停与恢复