可等待的 AutoResetEvent

Posted

技术标签:

【中文标题】可等待的 AutoResetEvent【英文标题】:Awaitable AutoResetEvent 【发布时间】:2021-10-05 10:36:46 【问题描述】:

什么是 AutoResetEvent 的异步(等待)等价物?

如果在经典线程同步中我们会使用这样的东西:

    AutoResetEvent signal = new AutoResetEvent(false);

    void Thread1Proc()
    
        //do some stuff
        //..
        //..

        signal.WaitOne(); //wait for an outer thread to signal we are good to continue

        //do some more stuff
        //..
        //..
    

    void Thread2Proc()
    
        //do some stuff
        //..
        //..

        signal.Set(); //signal the other thread it's good to go

        //do some more stuff
        //..
        //..
    

我希望在新的异步处理方式中,会变成这样:

SomeAsyncAutoResetEvent asyncSignal = new SomeAsyncAutoResetEvent();

async void Task1Proc()

    //do some stuff
    //..
    //..

    await asyncSignal.WaitOne(); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..


async void Task2Proc()

    //do some stuff
    //..
    //..

    asyncSignal.Set(); //signal the other thread it's good to go

    //do some more stuff
    //..
    //..

我见过其他定制的解决方案,但我设法在某个时间点上手,仍然涉及锁定线程。我不想仅仅为了使用新的 await 语法。我正在寻找一种真正的等待信号机制,它不会锁定任何线程。

这是我在任务并行库中缺少的东西吗?

编辑:澄清一下:SomeAsyncAutoResetEvent 是一个完全组成的类名,在我的示例中用作占位符。

【问题讨论】:

一次性使用的TaskCompletionSource,其结果被等待任务忽略。 gist.github.com/AArnott/1084951 也许? @MatthewWatson 我看到它使用了锁,这将阻止线程池中的线程。我希望得到一些不涉及阻塞线程的东西。 锁并不一定意味着线程被阻塞。 @DarkFalcon 是的。在这种情况下,它甚至可能不会阻塞任何线程。 【参考方案1】:

如果您想构建自己的,Stephen Toub has the definitive blog post on the subject。

如果你想使用已经写好的,I have one in my AsyncEx library。 AFAIK,在撰写本文时没有其他选择。

【讨论】:

为什么new SemaphoreSlim(1) 不工作,WaitOne()WaitAsync()Set() 变成Release() ARE 和信号量非常相似(尽管通常使用不同)。如果原语在已设置时发出信号,则会出现语义差异。 @AshleyJackson:这种方法确实使用了另一个线程。一些同步原语不允许这样做(例如,MutexMonitor),但由于这是一个AutoResetEvent,它应该可以工作。 我认为那些被命名为“斯蒂芬”的人天生就是为异步任何事情而生的。 Stephen Toubs 的帖子似乎已被移动 here【参考方案2】:

这是 Stephen Toub 的AsyncAutoResetEvent 的the source,以防他的博客离线。

public class AsyncAutoResetEvent

    private static readonly Task s_completed = Task.FromResult(true);
    private readonly Queue<TaskCompletionSource<bool>> m_waits = new Queue<TaskCompletionSource<bool>>();
    private bool m_signaled;

    public Task WaitAsync()
    
        lock (m_waits)
        
            if (m_signaled)
            
                m_signaled = false;
                return s_completed;
            
            else
            
                var tcs = new TaskCompletionSource<bool>();
                m_waits.Enqueue(tcs);
                return tcs.Task;
            
        
    

    public void Set()
    
        TaskCompletionSource<bool> toRelease = null;

        lock (m_waits)
        
            if (m_waits.Count > 0)
                toRelease = m_waits.Dequeue();
            else if (!m_signaled)
                m_signaled = true;
        

        toRelease?.SetResult(true);
    

【讨论】:

为什么可以在等待代码中使用常规锁?同一任务不能在这里作为不同的线程继续执行并绕过锁吗? @user1713059 请注意,WaitAsync 实际上并不是async 方法。这意味着它不会在处理过程中产生控制权。相反,它从TaskCompletionSource 获取Task 并在释放锁之前返回它。 啊,当然,所以即使我执行“await WaitAsync()”,也可以肯定整个方法由同一个线程执行,因为它实际上不是异步的 - 对吗? “Async”方法后缀让我误入歧途,但据我所知,它也用于没有“async”关键字的方法中。 它仍然是一个异步方法,因为它返回的任务可能在方法返回时尚未完成。但是该方法不是async,这意味着该方法不会在其主体内的某个点产生,而awaits 其他一些Task 的完成。返回Task(或Task&lt;T&gt;)的方法的约定是带有Async 后缀。 相对于您的原始评论,锁定在Task 返回给调用者之前释放,因此调用者无法绕过锁定。 【参考方案3】:

我认为 MSDN 上有很好的例子:https://msdn.microsoft.com/en-us/library/hh873178%28v=vs.110%29.aspx#WHToTap

public static Task WaitOneAsync(this WaitHandle waitHandle)

    if (waitHandle == null) 
        throw new ArgumentNullException("waitHandle");

    var tcs = new TaskCompletionSource<bool>();
    var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, 
        delegate  tcs.TrySetResult(true); , null, -1, true);
    var t = tcs.Task;
    t.ContinueWith( (antecedent) => rwh.Unregister(null));
    return t;

【讨论】:

绝对是最佳答案。【参考方案4】:

这是我制作的一个版本,它允许您指定超时。它源自 Stephen Toub 的解决方案。我们目前在生产工作负载中使用它。

public class AsyncAutoResetEvent

    readonly LinkedList<TaskCompletionSource<bool>> waiters = 
        new LinkedList<TaskCompletionSource<bool>>();

    bool isSignaled;

    public AsyncAutoResetEvent(bool signaled)
    
        this.isSignaled = signaled;
    

    public Task<bool> WaitAsync(TimeSpan timeout)
    
        return this.WaitAsync(timeout, CancellationToken.None);
    

    public async Task<bool> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
    
        TaskCompletionSource<bool> tcs;

        lock (this.waiters)
        
            if (this.isSignaled)
            
                this.isSignaled = false;
                return true;
            
            else if (timeout == TimeSpan.Zero)
            
                return this.isSignaled;
            
            else
            
                tcs = new TaskCompletionSource<bool>();
                this.waiters.AddLast(tcs);
            
        

        Task winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken));
        if (winner == tcs.Task)
        
            // The task was signaled.
            return true;
        
        else
        
            // We timed-out; remove our reference to the task.
            // This is an O(n) operation since waiters is a LinkedList<T>.
            lock (this.waiters)
            
                bool removed = this.waiters.Remove(tcs);
                Debug.Assert(removed);
                return false;
            
        
    

    public void Set()
    
        lock (this.waiters)
        
            if (this.waiters.Count > 0)
            
                // Signal the first task in the waiters list. This must be done on a new
                // thread to avoid stack-dives and situations where we try to complete the
                // same result multiple times.
                TaskCompletionSource<bool> tcs = this.waiters.First.Value;
                Task.Run(() => tcs.SetResult(true));
                this.waiters.RemoveFirst();
            
            else if (!this.isSignaled)
            
                // No tasks are pending
                this.isSignaled = true;
            
        
    

    public override string ToString()
    
        return $"Signaled: this.isSignaled.ToString(), Waiters: this.waiters.Count.ToString()";
    

【讨论】:

我认为 this.waiters 应该被锁定在 Remove(tcs) 操作路径中? @HelloSam 我认为你是对的!固定的。感谢您指出这一点。 我没有太多时间来调试这个,但是,请注意:我正在使用这个死锁。当一个新线程调用 event.Set() 时,它会挂在toRelease.SetResult(true); @Andy 感谢您的评论。自从我最初发布此内容以来,我做了一个额外的修复,我怀疑它解决了您的死锁(在我的情况下,它是一个 ***Exception)。解决方法是将SetResult(true) 调用包装在Task.Run(...) 中。 是我弄错了,还是在if (winner == tcs.Task)之后返回true的地方没有自动重置?【参考方案5】:

它也可以,但这种方式可能会削弱使用asyncawait 的目的。

AutoResetEvent asyncSignal = new AutoResetEvent();

async void Task1Proc()

    //do some stuff
    //..
    //..

    await Task.Run(() => asyncSignal.WaitOne()); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..

【讨论】:

为什么这被认为是不好的? @YarekT 我记得几个月前我写这个答案时的原因,但不是现在。我不认为这很糟糕,尽管其中存在不止一个上下文切换(通过 WaitOne() 和 await 关键字)性能问题。 不用担心。我最近一直在研究 C# 中的任务。从我可以收集到它的坏处,因为它通过创建一个线程来浪费一个线程,然后立即使其被等待阻塞。我已经看到了一些通过某种方式使用计时器来避免这种情况的解决方案,但它们看起来都非常复杂。无论如何,这是一个赞成票【参考方案6】:

我扩展了 Oleg Gordeev 提供的 MSDN 中的示例,并带有一个可选的超时 (ms):

public static Task WaitOneAsync(this WaitHandle waitHandle, double timeout = 0)
        
            if (waitHandle == null) throw new ArgumentNullException("waitHandle");

            var tcs = new TaskCompletionSource<bool>();

            if (timeout > 0) 
            
                var timer = new System.Timers.Timer(timeout) 
                 Enabled = true, AutoReset = false ;

                ElapsedEventHandler del = default;
                del = delegate (object x, System.Timers.ElapsedEventArgs y)
                
                    tcs.TrySetResult(true);
                    timer.Elapsed -= del; 
                    timer.Dispose();
                ;

                timer.Elapsed += del;
            
        
            var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
                      delegate  tcs.TrySetResult(true); ,
                      null, -1, true);

            var t = tcs.Task;
            t.ContinueWith((antecedent) => rwh.Unregister(null));

            return t;
        

【讨论】:

【参考方案7】:

这是我使用 SemaphoreSlim 的 COMPLETE 实现,使用所有 SemaphoreSlim.WaitAsync 覆盖:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents an event that, when signaled, resets automatically after releasing a single waiting task.
/// </summary>
public sealed class AutoResetEventAsync : IDisposable 

    /// <summary>
    /// Waits asynchronously until a signal is received.
    /// </summary>
    /// <returns>Task completed when the event is signaled.</returns>
    public async ValueTask WaitAsync() 
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync();
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout) 
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(millisecondsTimeout);
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) 
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try 
            await s.WaitAsync(millisecondsTimeout, cancellationToken);
        
        finally 
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        
    

    /// <summary>
    /// Waits asynchronously until a signal is received or the token is cancelled.
    /// </summary>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled or the token is cancelled.</returns>
    public async ValueTask WaitAsync(CancellationToken cancellationToken) 
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try 
            await s.WaitAsync(cancellationToken);
        
        finally 
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        
    

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout) 
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(timeout);
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) 
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try 
            await s.WaitAsync(timeout, cancellationToken);
        
        finally 
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        
    

    /// <summary>
    /// Sets the state of the event to signaled, allowing one or more waiting tasks to proceed.
    /// </summary>
    public void Set() 
        SemaphoreSlim? toRelease = null;
        lock (Q) 
            if (Q.Count > 0) toRelease = Q.Dequeue();
            else if (!IsSignaled) IsSignaled = true;
        
        toRelease?.Release();
    

    /// <summary>
    /// Sets the state of the event to non nonsignaled, making the waiting tasks to wait.
    /// </summary>
    public void Reset() => IsSignaled = false;

    /// <summary>
    /// Disposes any semaphores left in the queue.
    /// </summary>
    public void Dispose() 
        lock (Q) 
            while (Q.Count > 0) Q.Dequeue().Dispose();
        
    

    /// <summary>
    /// Checks the <see cref="IsSignaled"/> state and resets it when it's signaled.
    /// </summary>
    /// <returns>True if the event was in signaled state.</returns>
    private bool CheckSignaled() 
        lock (Q) 
            if (IsSignaled) 
                IsSignaled = false;
                return true;
            
            return false;
        
    

    private readonly Queue<SemaphoreSlim> Q = new();
    private volatile bool IsSignaled;


我使用了SemaphoreSlim,因为它“免费”提供超时和取消令牌支持。如果我只是将SemaphoreSlim 的原始.NET 源代码修改为类似于AutoResetEvent 的行为可能会更好,但不,就是这样。如果您发现任何错误,请告诉我。

【讨论】:

AutoResetEventAsync 类是线程安全的吗?如果是,如果两个线程同时调用WaitAsync() 会发生什么?在它们中的任何一个执行IsSignaled = false; 行之前,两者都不会将IsSignaled 字段读取为true 吗?同样if (Q.Contains(s)) Q.Dequeue().Dispose(); 行搜索s 是否存在于队列中,然后出列并处理其他一些信号量(很可能)。这是故意的吗? @TheodorZoulias :是的,因为即使2个线程可以同时进入WaitAsync,它们也不能同时通过Q访问。请注意,Q 只能通过单个线程访问。这使得流程简单直接。这也意味着内部等待只能由单个线程访问。因此,无效信号量不可能出队。我在这门课上进行的多项测试还没有失败,但这并不能证明它是有效的。我认为单步访问 Q 可以。 我说的是这条线:if (IsSignaled) IsSignaled = false; return; 。这不受锁的保护。 IsSignaled 甚至不是 volatile 字段。至于if (Q.Contains(s)),如果您确定s 只能在队列的头部,if (Q.Peak() == s) 会更快,更能表达代码的意图。顺便说一句,如果cancellationToken 被取消并且WaitAsync 抛出会发生什么? 您发现了一些有趣的边缘案例。我将尝试修复它们并编辑我的示例... BRB。 感谢您的洞察力,然后我将使用更好的版本和您的扩展程序。无论如何,作为一次学习经历,这是完全值得的。你是 MVP。【参考方案8】:

这是我的一次性事件版本,可以由多个线程等待。它在内部依赖于BoundedChannel

public class AsyncOneTimeEvent<T>

    private T Result  get; set; 

    private readonly Channel<bool> _channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(1)
    
        SingleReader = false,
        SingleWriter = true,
        FullMode = BoundedChannelFullMode.DropWrite,
    );

    public async Task<T> GetResult()
    
        await _channel.Reader.WaitToReadAsync().ConfigureAwait(false);

        return this.Result;
    

    public void SetResult(T result)
    
        this.Result = result;
        _channel.Writer.Complete();
    

    public void SetError(Exception ex)
    
        _channel.Writer.Complete(ex);
    

【讨论】:

使用Channel 代替TaskCompletionSource 似乎是一个聪明的主意。但这也是不必要的,并且实现似乎容易受到可见性问题的影响。我不确定在所有情况下所有线程都会“看到”非易失性private T Result 字段的最新值。 例子:线程A进入GetResult()方法,乱序读取Result的值,然后被操作系统挂起。线程 B 进入和退出SetResult 方法。线程 A 恢复,同步执行 await _channel.Reader.WaitToReadAsync() 行,并返回一个以 default(T) 为值的 Task。根据 C# ECMA-334 规范,这种情况是不可能的吗?我不知道! @TheodorZoulias 当然可以,您可以在线尝试:dotnetfiddle.net/uyQRG1 我确信它有效。我不确定它是否能保证在所有 CPU 架构上都能正常工作。可见性问题是出了名的难以调试。您可以阅读 Igor Ostrovsky 的 this article 了解原因。

以上是关于可等待的 AutoResetEvent的主要内容,如果未能解决你的问题,请参考以下文章

可等待的 AutoResetEvent

如何等待所有可观察对象按顺序执行?

等待非 Promise 是不是有任何可检测的效果?

等待文件可写

Selenium 等待可点击 - 元素点击被拦截问题

Angular 2+等待方法/可观察完成