我想等待手动重置事件超时并观察取消。我想出了类似下面的东西。手动重置事件对象由我无法控制的 API 提供。有没有办法在不占用和阻止来自 ThreadPool 的线程的情况下实现这一点?

static Task<bool> TaskFromWaitHandle(WaitHandle mre, int timeout, CancellationToken ct)

    return Task.Run(() =>
        bool s = WaitHandle.WaitAny(new WaitHandle[]  mre, ct.WaitHandle , timeout) == 0;
        return s;
    , ct);

if (await TaskFromWaitHandle(manualResetEvent, 1000, cts.Token))

    // true if event was set


    // false if timed out, exception if cancelled 

[EDITED] 显然,makes sense 使用 RegisterWaitForSingleObject。我试试看。


在这种情况下,我完全希望答案是"否"——将同步 API 包装在异步 API 中通常确实涉及阻塞线程。 我在考虑类似ThreadPool.RegisterWaitForSingleObject 甚至非托管RegisterWaitForSingleObject 之类的东西。我意识到从技术上讲,仍然有一个内核池线程在等待,但理论上它可能有一些用于单个等待句柄的聚合逻辑(例如,使用 WaitForMultipleObjects),因此一个线程可能会服务多个 RegisterWaitForSingleObject 请求。这比上面的一个池线程每个等待句柄要好。 这只是一个事件,不,RWFSO 不会优化它。它使用一个专用的等待线程,它可以处理许多 RWFSO 调用。

RegisterWaitForSingleObject 将把等待合并到专用的等待线程上,每个线程都可以等待多个句柄(具体来说,63 个,即MAXIMUM_WAIT_OBJECTS 减去一个“控制”句柄)。


public static class WaitHandleExtensions

    public static Task AsTask(this WaitHandle handle)
        return AsTask(handle, Timeout.InfiniteTimeSpan);

    public static Task AsTask(this WaitHandle handle, TimeSpan timeout)
        var tcs = new TaskCompletionSource<object>();
        var registration = ThreadPool.RegisterWaitForSingleObject(handle, (state, timedOut) =>
            var localTcs = (TaskCompletionSource<object>)state;
            if (timedOut)
        , tcs, timeout, executeOnlyOnce: true);
        tcs.Task.ContinueWith((_, state) => ((RegisteredWaitHandle)state).Unregister(null), registration, TaskScheduler.Default);
        return tcs.Task;


我正在使用RegisterWaitForSingleObjectContinueWith 上的状态参数作为优化。这比WaitAny 更受欢迎,因为此等待可以与其他等待结合使用。 @StephenCleary 但为什么类型不安全? TaskCompletionSource&lt;RegisteredWaitHandle&gt;.Task 仍会隐式转换为 Task 就好了。或者,如果这不是一个选项,您可以通过使用自动关闭来获得类型安全 ;-)。 @ChrisMoschini:如果你不能保证Task 变量会在请求到达之前被设置,你可以使用TaskCompletionSource&lt;T&gt; @JotaBe:这些天我有一个more modern approach。这两种方法都假设句柄最终会收到信号(或超时)。如果不是,则任务将无法完成。因此,如果您有一个循环或在 *Any 方法中使用它的东西,您的代码可能每次都开始一个 new 等待,而不是使用现有的。

您也可以使用类似于 ManualResetEvent 的 SemaphoreSlim.WaitAsync()


问题是没有"等待不减量"。有时您只想等待而不是递减计数器。 SemaphoreSlim 类似于AutoResetEvent,而不是ManualResetEvent 另一个问题是调用 Reset() 可能会通过 SemaphoreFullException 如果没有任何东西可以释放。即使在调用 Release() 之前检查 CurrentCount,我也会遇到多个线程通过该检查并尝试释放信号量的情况。我试图避免任何异常的可能性,因为它们会损害性能。

您可以试一试https://www.badflyer.com/asyncmanualresetevent,尝试在https://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx 上的示例的基础上支持超时和取消。

using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// An async manual reset event.
/// </summary>
public sealed class ManualResetEventAsync

    // Inspiration from https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
    // and the .net implementation of SemaphoreSlim

    /// <summary>
    ///  The timeout in milliseconds to wait indefinitly.
    /// </summary>
    private const int WaitIndefinitly = -1;

    /// <summary>
    /// True to run synchronous continuations on the thread which invoked Set. False to run them in the threadpool.
    /// </summary>
    private readonly bool runSynchronousContinuationsOnSetThread = true;

    /// <summary>
    /// The current task completion source.
    /// </summary>
    private volatile TaskCompletionSource<bool> completionSource = new TaskCompletionSource<bool>();

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    public ManualResetEventAsync(bool isSet)
        : this(isSet: isSet, runSynchronousContinuationsOnSetThread: true)

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    /// <param name="runSynchronousContinuationsOnSetThread">If you have synchronous continuations, they will run on the thread which invokes Set, unless you set this to false.</param>
    public ManualResetEventAsync(bool isSet, bool runSynchronousContinuationsOnSetThread)
        this.runSynchronousContinuationsOnSetThread = runSynchronousContinuationsOnSetThread;

        if (isSet)

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <returns>A task which completes when the event is set.</returns>
    public Task WaitAsync()
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, default(CancellationToken));

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event.</returns>
    public Task WaitAsync(CancellationToken token)
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, token);

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout, CancellationToken token)
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, token);

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout)
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, default(CancellationToken));

    /// <summary>
    /// Set the completion source.
    /// </summary>
    public void Set()
        if (this.runSynchronousContinuationsOnSetThread)
            // Run synchronous completions in the thread pool.
            Task.Run(() => this.completionSource.TrySetResult(true));

    /// <summary>
    /// Reset the manual reset event.
    /// </summary>
    public void Reset()
        // Grab a reference to the current completion source.
        var currentCompletionSource = this.completionSource;

        // Check if there is nothing to be done, return.
        if (!currentCompletionSource.Task.IsCompleted)

        // Otherwise, try to replace it with a new completion source (if it is the same as the reference we took before).
        Interlocked.CompareExchange(ref this.completionSource, new TaskCompletionSource<bool>(), currentCompletionSource);

    /// <summary>
    /// Await completion based on a timeout and a cancellation token.
    /// </summary>
    /// <param name="timeoutMS">The timeout in milliseconds.</param>
    /// <param name="token">The cancellation token.</param>
    /// <returns>A task (true if wait succeeded). (False on timeout).</returns>
    private async Task<bool> AwaitCompletion(int timeoutMS, CancellationToken token)
        // Validate arguments.
        if (timeoutMS < -1 || timeoutMS > int.MaxValue)
            throw new ArgumentException("The timeout must be either -1ms (indefinitely) or a positive ms value <= int.MaxValue");

        CancellationTokenSource timeoutToken = null;

        // If the token cannot be cancelled, then we dont need to create any sort of linked token source.
        if (false == token.CanBeCanceled)
            // If the wait is indefinite, then we don't need to create a second task at all to wait on, just wait for set. 
            if (timeoutMS == -1)
                return await this.completionSource.Task;

            timeoutToken = new CancellationTokenSource();
            // A token source which will get canceled either when we cancel it, or when the linked token source is canceled.
            timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(token);

        using (timeoutToken)
            // Create a task to account for our timeout. The continuation just eats the task cancelled exception, but makes sure to observe it.
            Task delayTask = Task.Delay(timeoutMS, timeoutToken.Token).ContinueWith((result) =>  var e = result.Exception; , TaskContinuationOptions.ExecuteSynchronously);

            var resultingTask = await Task.WhenAny(this.completionSource.Task, delayTask).ConfigureAwait(false);

            // The actual task finished, not the timeout, so we can cancel our cancellation token and return true.
            if (resultingTask != delayTask)
                // Cancel the timeout token to cancel the delay if it is still going.
                return true;

            // Otherwise, the delay task finished. So throw if it finished because it was canceled.
            return false;




当使用Task.WaitAny()Task(由SqlConnection.OpenAsync()' 返回)和作为参数接收并包装在TaskAsTask() 中的手动重置事件时,我遇到了内存泄漏。这些对象没有被释放:TaskCompletionSource&lt;Object&gt;, Task&lt;Object&gt;, StandardTaskContinuation, RegisteredWaitHandle, RegisteredWaithandleSafe, ContinuationResultTaskFromresultTask&lt;Object,bool&gt;, _ThreadPoolWaitOrTimerCallback)。

这是在 Windows 服务中使用的真实生产代码,该函数尝试在循环中打开与数据库的连接,直到连接打开,或操作失败,或作为参数接收到 ManualResetEvent _finishRequest在包含此代码的函数中,由任何其他线程中的代码发出信号。

为了避免泄漏,我决定反其道而行之:等待_finishRequestTask 返回的句柄OpenAsync()

Task asyncOpening = sqlConnection.OpenAsync();

// Wait for the async open to finish, or until _finishRequest is signaled
var waitHandles = new WaitHandle[]

  // index 0 in array: extract the AsyncWaitHandle from the Task
  // index 1:

// Check if finish was requested (index of signaled handle in the array = 1)
int whichFinished = WaitHandle.WaitAny(waitHandles);
finishRequested = whichFinished == 1;
// If so, break the loop to exit immediately
if (finishRequested)
// If not, check if OpenAsync finished with error (it's a Task)
if (asyncOpening.IsFaulted)

  // Extract the exception from the task, and throw it
  // NOTE: adapt it to your case. In mine, I'm interested in the inner exception,
  // but you can check the exception itself, for example to see if it was a timeout,
  // if you specified it in the call to the async function that returns the Task
  var ex = asyncOpening?.Exception?.InnerExceptions?[0];
  if (ex != null) throw ex; 


  Log.Verbose("Connection to database Database on server Server", database, server);

如果你也需要超时,你可以在调用OpenAsync,或者你的异步函数中包含它,然后检查异步操作的结果是否因为超时而被取消:检查Task的状态完成后,您可以在代码注释中的 NOTE 中看到。



public static class WaitHandleExtensions

    public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
        if (waitHandle == null)
            throw new ArgumentNullException(nameof(waitHandle));

        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
        TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;

        RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
            (_, timedOut) =>
                if (timedOut)
            null, timeout, true);

        Task<bool> task = tcs.Task;

        _ = task.ContinueWith(_ =>
            return ctr.Unregister();
        , CancellationToken.None);

        return task;


以上是关于将 ManualResetEvent 包装为等待任务的主要内容,如果未能解决你的问题,请参考以下文章





Thread.Yield 与 WaitOne
