如何在 C# 中等待单个事件,带有超时和取消
Posted
技术标签:
【中文标题】如何在 C# 中等待单个事件,带有超时和取消【英文标题】:How to wait for a single event in C#, with timeout and cancellation 【发布时间】:2013-07-12 04:59:12 【问题描述】:所以我的要求是让我的函数等待第一个实例 event Action<T>
来自另一个类和另一个线程,并在我的线程上处理它,允许等待被超时或 CancellationToken
中断。
我想创建一个可以重用的通用函数。我设法创建了几个(我认为)我需要的选项,但两者似乎都比我想象的要复杂。
用法
为了清楚起见,此函数的示例使用如下所示,其中serialDevice
在单独的线程上吐出事件:
var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
cancellationToken,
statusPacket => OnStatusPacketReceived(statusPacket),
a => serialDevice.StatusPacketReceived += a,
a => serialDevice.StatusPacketReceived -= a,
5000,
() => serialDevice.RequestStatusPacket());
选项 1——ManualResetEventSlim
这个选项还不错,但是Dispose
对ManualResetEventSlim
的处理比它看起来应该的要混乱。它使 ReSharper 适合我在闭包中访问修改/处置的东西,而且真的很难理解,所以我什至不确定它是否正确。也许我缺少一些可以清理它的东西,这将是我的偏好,但我没有立即看到它。这是代码。
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
var eventOccurred = false;
var eventResult = default(TEvent);
var o = new object();
var slim = new ManualResetEventSlim();
Action<TEvent> setResult = result =>
lock (o) // ensures we get the first event only
if (!eventOccurred)
eventResult = result;
eventOccurred = true;
// ReSharper disable AccessToModifiedClosure
// ReSharper disable AccessToDisposedClosure
if (slim != null)
slim.Set();
// ReSharper restore AccessToDisposedClosure
// ReSharper restore AccessToModifiedClosure
;
subscribe(setResult);
try
if (initializer != null)
initializer();
slim.Wait(msTimeout, token);
finally // ensures unsubscription in case of exception
unsubscribe(setResult);
lock(o) // ensure we don't access slim
slim.Dispose();
slim = null;
lock (o) // ensures our variables don't get changed in middle of things
if (eventOccurred)
handler(eventResult);
return eventOccurred;
选项 2 — 不带 WaitHandle
的轮询
这里的WaitForSingleEvent
函数更简洁。我可以使用ConcurrentQueue
,因此甚至不需要锁。但我只是不喜欢轮询功能Sleep
,而且我看不出用这种方法有什么办法。我想传入WaitHandle
而不是Func<bool>
来清理Sleep
,但第二次这样做我又要清理整个Dispose
。
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
var q = new ConcurrentQueue<TEvent>();
subscribe(q.Enqueue);
try
if (initializer != null)
initializer();
token.Sleep(msTimeout, () => !q.IsEmpty);
finally // ensures unsubscription in case of exception
unsubscribe(q.Enqueue);
TEvent eventResult;
var eventOccurred = q.TryDequeue(out eventResult);
if (eventOccurred)
handler(eventResult);
return eventOccurred;
public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
var start = DateTime.Now;
while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
token.ThrowIfCancellationRequested();
Thread.Sleep(1);
问题
我并不特别关心这些解决方案中的任何一个,我也不能 100% 确定它们中的任何一个都是 100% 正确的。这些解决方案中的任何一个是否比另一个更好(惯用性、效率等),还是有更简单的方法或内置功能来满足我在这里需要做的事情?
更新:迄今为止的最佳答案
以下TaskCompletionSource
解决方案的修改。无需长时间关闭、锁或任何需要的东西。看起来很简单。这里有错误吗?
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
var tcs = new TaskCompletionSource<TEvent>();
Action<TEvent> handler = result => tcs.TrySetResult(result);
var task = tcs.Task;
subscribe(handler);
try
if (initializer != null)
initializer();
task.Wait(msTimeout, token);
finally
unsubscribe(handler);
// Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
if (task.Status == TaskStatus.RanToCompletion)
onEvent(task.Result);
return true;
return false;
更新 2:另一个很好的解决方案
原来BlockingCollection
的工作方式与ConcurrentQueue
一样,但也有接受超时和取消令牌的方法。这个解决方案的一个好处是它可以很容易地更新为WaitForNEvents
:
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
var q = new BlockingCollection<TEvent>();
Action<TEvent> add = item => q.TryAdd(item);
subscribe(add);
try
if (initializer != null)
initializer();
TEvent eventResult;
if (q.TryTake(out eventResult, msTimeout, token))
handler(eventResult);
return true;
return false;
finally
unsubscribe(add);
q.Dispose();
【问题讨论】:
听起来你想要AutoResetEvent
之类的东西。您是否考虑过使用它的可能性?
@KendallFrey 是的,这似乎让我陷入了与ManualResetEventSlim
相同的Dispose
混乱,或者你有办法解决它吗?
Resharper 抱怨的原因是因为传入了订阅和取消订阅操作,它无法分析流程。如果您通过事件(通过反射)或以某种方式使流更可验证。无论如何,我个人并不高度重视 Resharper 警告。
这些都没有任何意义。如果您无法控制事件源,则无法拦截在线程池线程上触发的 DataReceived 之类的事件。让它只触发一次也没有任何意义,它只意味着调用 Read() 不会阻塞。它不保证您会获得想要阅读的所有内容。
@HansPassant 你能澄清一下吗?当我测试它们时,这两个选项都没有错误(到目前为止)。我什至没有 Read
问题中的函数,所以我不确定你的意思。 serialDevice.RequestStatusPacket()
将命令发送到我的设备以响应状态数据包,我需要等待响应,该响应来自另一个线程,超时或取消,并在我自己的线程上处理事件。不知道为什么这没有意义。
【参考方案1】:
您可以使用TaskCompletetionSource
创建一个Task
,您可以将其标记为已完成或已取消。以下是特定事件的可能实现:
public Task WaitFirstMyEvent(Foo target, CancellationToken cancellationToken)
var tcs = new TaskCompletionSource<object>();
Action handler = null;
var registration = cancellationToken.Register(() =>
target.MyEvent -= handler;
tcs.TrySetCanceled();
);
handler = () =>
target.MyEvent -= handler;
registration.Dispose();
tcs.TrySetResult(null);
;
target.MyEvent += handler;
return tcs.Task;
在 C# 5 中,您可以像这样使用它:
private async Task MyMethod()
...
await WaitFirstMyEvent(foo, cancellationToken);
...
如果要同步等待事件,也可以使用Wait
方法:
private void MyMethod()
...
WaitFirstMyEvent(foo, cancellationToken).Wait();
...
这是一个更通用的版本,但它仍然只适用于带有Action
签名的事件:
public Task WaitFirstEvent(
Action<Action> subscribe,
Action<Action> unsubscribe,
CancellationToken cancellationToken)
var tcs = new TaskCompletionSource<object>();
Action handler = null;
var registration = cancellationToken.Register(() =>
unsubscribe(handler);
tcs.TrySetCanceled();
);
handler = () =>
unsubscribe(handler);
registration.Dispose();
tcs.TrySetResult(null);
;
subscribe(handler);
return tcs.Task;
你可以这样使用它:
await WaitFirstEvent(
handler => foo.MyEvent += handler,
handler => foo.MyEvent -= handler,
cancellationToken);
如果您希望它与其他事件签名(例如EventHandler
)一起使用,则必须创建单独的重载。我认为没有一种简单的方法可以使它适用于任何签名,尤其是因为参数的数量并不总是相同的。
【讨论】:
我添加了对问题的更新——以您的示例为起点的可能解决方案。我对TaskCompletionSource
或真正的Task
没有任何经验。您在解决方案中看到任何明显的错误吗? (它是 .Net 4.0 和桌面应用程序,因此使用 task.Wait
保持线程不是问题。)
@lob,好吧,您的代码不支持取消。此外,测试任务的状态没有意义:如果它到达那个点,状态只能是 RanToCompletion,否则会出现异常
请注意,我使用的是 task.Wait
重载,它需要超时和 CancellationToken。它似乎完成了这项工作。如果在调用Wait
时取消令牌,则抛出OperationCancellationException
,如果超时,则Status
仍为TaskStatus.WaitingForActivation
。
@lobsterism,是的,我不知道我是如何错过使用 CancellationToken 的......但是你没有捕捉到异常,所以它只会在你有一个之前传播给调用者检查任务状态的机会【参考方案2】:
您可以使用 Rx 将事件转换为 observable,然后转换为任务,最后使用令牌/超时等待该任务。
与任何现有解决方案相比,它的一个优点是它在事件线程上调用unsubscribe
,确保您的处理程序不会被调用两次。 (在您的第一个解决方案中,您可以通过 tcs.TrySetResult
而不是 tcs.SetResult
来解决这个问题,但摆脱“TryDoSomething”并确保 DoSomething 始终有效总是很好)。
另一个优点是代码的简单性。它本质上是一条线。所以你甚至不需要一个独立的函数。您可以内联它,以便更清楚您的代码究竟做了什么,并且您可以在不需要大量可选参数的情况下对主题进行更改(例如您的可选initializer
,或允许等待 N 个事件,或前面的超时/取消在不需要的情况下)。当它完成时,你将同时拥有bool
return val 和实际的result
,如果这有用的话。
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
if (initializer != null)
initializer();
try
var finished = task.Wait(msTimeout, token);
if (finished) onEvent(task.Result);
return finished;
catch (OperationCanceledException) return false;
【讨论】:
【参考方案3】:非常感谢! 为了帮助别人理解... (可能会显示带有命中操作处理程序代码的串行设备代码)
您还可以添加一个泛型类型约束,添加类似
where TEvent : EventArgs
在我的情况下,我还需要“服务员”中的事件结果 所以我改变了签名像 (在通用对象上又快又丑……)
public static bool WaitForSingleEventWithResult<TEvent, TObjRes>(
this CancellationToken token,
Func<TEvent, TObjRes> onEvent,
...
这样称呼它
var ct = new CancellationToken();
object result;
bool eventOccurred = ct.WaitForSingleEventWithResult<MyEventArgs, object>(
onEvent: statusPacket => result = this.OnStatusPacketReceived(statusPacket),
subscribe: sub => cp.StatusPacketReceived_Action += sub,
unsubscribe: unsub => cp.StatusPacketReceived_Action -= unsub,
msTimeout: 5 * 1000,
initializer: /*() => serialDevice.RequestStatusPacket()*/null);
无论如何...非常感谢!
【讨论】:
【参考方案4】:为什么不直接使用
ManualResetEventSlim.Wait (int millisecondsTimeout, CancellationToken cancellationToken)
?
【讨论】:
以上是关于如何在 C# 中等待单个事件,带有超时和取消的主要内容,如果未能解决你的问题,请参考以下文章