将 ManualResetEvent 包装为等待任务
Posted
技术标签:
【中文标题】将 ManualResetEvent 包装为等待任务【英文标题】:Wrapping ManualResetEvent as awaitable task 【发布时间】:2013-09-16 08:56:53 【问题描述】:我想等待手动重置事件超时并观察取消。我想出了类似下面的东西。手动重置事件对象由我无法控制的 API 提供。有没有办法在不占用和阻止来自 ThreadPool 的线程的情况下实现这一点?
static Task<bool> TaskFromWaitHandle(WaitHandle mre, int timeout, CancellationToken ct)
return Task.Run(() =>
bool s = WaitHandle.WaitAny(new WaitHandle[] mre, ct.WaitHandle , timeout) == 0;
ct.ThrowIfCancellationRequested();
return s;
, ct);
// ...
if (await TaskFromWaitHandle(manualResetEvent, 1000, cts.Token))
// true if event was set
else
// false if timed out, exception if cancelled
[EDITED] 显然,makes sense 使用 RegisterWaitForSingleObject
。我试试看。
【问题讨论】:
见blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx 感谢@JonSkeet,在我的情况下,它必须是一个真正的手动重置事件,超出我的控制范围。我会更新问题以反映这一点。 在这种情况下,我完全希望答案是“否”——将同步 API 包装在异步 API 中通常确实涉及阻塞线程。 我在考虑类似ThreadPool.RegisterWaitForSingleObject
甚至非托管RegisterWaitForSingleObject 之类的东西。我意识到从技术上讲,仍然有一个内核池线程在等待,但理论上它可能有一些用于单个等待句柄的聚合逻辑(例如,使用 WaitForMultipleObjects
),因此一个线程可能会服务多个 RegisterWaitForSingleObject
请求。这比上面的一个池线程每个等待句柄要好。
这只是一个事件,不,RWFSO 不会优化它。它使用一个专用的等待线程,它可以处理许多 RWFSO 调用。
【参考方案1】:
RegisterWaitForSingleObject
将把等待合并到专用的等待线程上,每个线程都可以等待多个句柄(具体来说,63 个,即MAXIMUM_WAIT_OBJECTS
减去一个“控制”句柄)。
所以你应该可以使用这样的东西(警告:未经测试):
public static class WaitHandleExtensions
public static Task AsTask(this WaitHandle handle)
return AsTask(handle, Timeout.InfiniteTimeSpan);
public static Task AsTask(this WaitHandle handle, TimeSpan timeout)
var tcs = new TaskCompletionSource<object>();
var registration = ThreadPool.RegisterWaitForSingleObject(handle, (state, timedOut) =>
var localTcs = (TaskCompletionSource<object>)state;
if (timedOut)
localTcs.TrySetCanceled();
else
localTcs.TrySetResult(null);
, tcs, timeout, executeOnlyOnce: true);
tcs.Task.ContinueWith((_, state) => ((RegisteredWaitHandle)state).Unregister(null), registration, TaskScheduler.Default);
return tcs.Task;
【讨论】:
谢谢@StephenCleary。这里有充分的理由将tcs
传递为state
吗?总体而言,您认为这是比使用 WaitHandle.WaitAny
更受欢迎的解决方案吗?
我正在使用RegisterWaitForSingleObject
和ContinueWith
上的状态参数作为优化。这比WaitAny
更受欢迎,因为此等待可以与其他等待结合使用。
@StephenCleary 但为什么类型不安全? TaskCompletionSource<RegisteredWaitHandle>.Task
仍会隐式转换为 Task
就好了。或者,如果这不是一个选项,您可以通过使用自动关闭来获得类型安全 ;-)。
@ChrisMoschini:如果你不能保证Task
变量会在请求到达之前被设置,你可以使用TaskCompletionSource<T>
。
@JotaBe:这些天我有一个more modern approach。这两种方法都假设句柄最终会收到信号(或超时)。如果不是,则任务将无法完成。因此,如果您有一个循环或在 *Any
方法中使用它的东西,您的代码可能每次都开始一个 new 等待,而不是使用现有的。【参考方案2】:
您也可以使用类似于 ManualResetEvent 的 SemaphoreSlim.WaitAsync()
【讨论】:
问题是没有“等待不减量”。有时您只想等待而不是递减计数器。SemaphoreSlim
类似于AutoResetEvent
,而不是ManualResetEvent
另一个问题是调用 Reset() 可能会通过 SemaphoreFullException 如果没有任何东西可以释放。即使在调用 Release() 之前检查 CurrentCount,我也会遇到多个线程通过该检查并尝试释放信号量的情况。我试图避免任何异常的可能性,因为它们会损害性能。【参考方案3】:
您可以试一试https://www.badflyer.com/asyncmanualresetevent,尝试在https://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx 上的示例的基础上支持超时和取消。
using System;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// An async manual reset event.
/// </summary>
public sealed class ManualResetEventAsync
// Inspiration from https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
// and the .net implementation of SemaphoreSlim
/// <summary>
/// The timeout in milliseconds to wait indefinitly.
/// </summary>
private const int WaitIndefinitly = -1;
/// <summary>
/// True to run synchronous continuations on the thread which invoked Set. False to run them in the threadpool.
/// </summary>
private readonly bool runSynchronousContinuationsOnSetThread = true;
/// <summary>
/// The current task completion source.
/// </summary>
private volatile TaskCompletionSource<bool> completionSource = new TaskCompletionSource<bool>();
/// <summary>
/// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
/// </summary>
/// <param name="isSet">True to set the task completion source on creation.</param>
public ManualResetEventAsync(bool isSet)
: this(isSet: isSet, runSynchronousContinuationsOnSetThread: true)
/// <summary>
/// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
/// </summary>
/// <param name="isSet">True to set the task completion source on creation.</param>
/// <param name="runSynchronousContinuationsOnSetThread">If you have synchronous continuations, they will run on the thread which invokes Set, unless you set this to false.</param>
public ManualResetEventAsync(bool isSet, bool runSynchronousContinuationsOnSetThread)
this.runSynchronousContinuationsOnSetThread = runSynchronousContinuationsOnSetThread;
if (isSet)
this.completionSource.TrySetResult(true);
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <returns>A task which completes when the event is set.</returns>
public Task WaitAsync()
return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, default(CancellationToken));
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <param name="token">A cancellation token.</param>
/// <returns>A task which waits for the manual reset event.</returns>
public Task WaitAsync(CancellationToken token)
return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, token);
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <param name="timeout">A timeout.</param>
/// <param name="token">A cancellation token.</param>
/// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
public Task<bool> WaitAsync(TimeSpan timeout, CancellationToken token)
return this.AwaitCompletion((int)timeout.TotalMilliseconds, token);
/// <summary>
/// Wait for the manual reset event.
/// </summary>
/// <param name="timeout">A timeout.</param>
/// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
public Task<bool> WaitAsync(TimeSpan timeout)
return this.AwaitCompletion((int)timeout.TotalMilliseconds, default(CancellationToken));
/// <summary>
/// Set the completion source.
/// </summary>
public void Set()
if (this.runSynchronousContinuationsOnSetThread)
this.completionSource.TrySetResult(true);
else
// Run synchronous completions in the thread pool.
Task.Run(() => this.completionSource.TrySetResult(true));
/// <summary>
/// Reset the manual reset event.
/// </summary>
public void Reset()
// Grab a reference to the current completion source.
var currentCompletionSource = this.completionSource;
// Check if there is nothing to be done, return.
if (!currentCompletionSource.Task.IsCompleted)
return;
// Otherwise, try to replace it with a new completion source (if it is the same as the reference we took before).
Interlocked.CompareExchange(ref this.completionSource, new TaskCompletionSource<bool>(), currentCompletionSource);
/// <summary>
/// Await completion based on a timeout and a cancellation token.
/// </summary>
/// <param name="timeoutMS">The timeout in milliseconds.</param>
/// <param name="token">The cancellation token.</param>
/// <returns>A task (true if wait succeeded). (False on timeout).</returns>
private async Task<bool> AwaitCompletion(int timeoutMS, CancellationToken token)
// Validate arguments.
if (timeoutMS < -1 || timeoutMS > int.MaxValue)
throw new ArgumentException("The timeout must be either -1ms (indefinitely) or a positive ms value <= int.MaxValue");
CancellationTokenSource timeoutToken = null;
// If the token cannot be cancelled, then we dont need to create any sort of linked token source.
if (false == token.CanBeCanceled)
// If the wait is indefinite, then we don't need to create a second task at all to wait on, just wait for set.
if (timeoutMS == -1)
return await this.completionSource.Task;
timeoutToken = new CancellationTokenSource();
else
// A token source which will get canceled either when we cancel it, or when the linked token source is canceled.
timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(token);
using (timeoutToken)
// Create a task to account for our timeout. The continuation just eats the task cancelled exception, but makes sure to observe it.
Task delayTask = Task.Delay(timeoutMS, timeoutToken.Token).ContinueWith((result) => var e = result.Exception; , TaskContinuationOptions.ExecuteSynchronously);
var resultingTask = await Task.WhenAny(this.completionSource.Task, delayTask).ConfigureAwait(false);
// The actual task finished, not the timeout, so we can cancel our cancellation token and return true.
if (resultingTask != delayTask)
// Cancel the timeout token to cancel the delay if it is still going.
timeoutToken.Cancel();
return true;
// Otherwise, the delay task finished. So throw if it finished because it was canceled.
token.ThrowIfCancellationRequested();
return false;
【讨论】:
【参考方案4】:替代方案:等待任务的句柄和手动重置事件
当使用Task.WaitAny()
和Task
(由SqlConnection.OpenAsync()' 返回)和作为参数接收并包装在Task
和AsTask()
中的手动重置事件时,我遇到了内存泄漏。这些对象没有被释放:TaskCompletionSource<Object>, Task<Object>, StandardTaskContinuation, RegisteredWaitHandle, RegisteredWaithandleSafe, ContinuationResultTaskFromresultTask<Object,bool>, _ThreadPoolWaitOrTimerCallback
)。
这是在 Windows 服务中使用的真实生产代码,该函数尝试在循环中打开与数据库的连接,直到连接打开,或操作失败,或作为参数接收到 ManualResetEvent _finishRequest
在包含此代码的函数中,由任何其他线程中的代码发出信号。
为了避免泄漏,我决定反其道而行之:等待_finishRequest
和Task
返回的句柄OpenAsync()
:
Task asyncOpening = sqlConnection.OpenAsync();
// Wait for the async open to finish, or until _finishRequest is signaled
var waitHandles = new WaitHandle[]
// index 0 in array: extract the AsyncWaitHandle from the Task
((IAsyncResult)asyncOpening).AsyncWaitHandle,
// index 1:
_finishRequest
;
// Check if finish was requested (index of signaled handle in the array = 1)
int whichFinished = WaitHandle.WaitAny(waitHandles);
finishRequested = whichFinished == 1;
// If so, break the loop to exit immediately
if (finishRequested)
break;
// If not, check if OpenAsync finished with error (it's a Task)
if (asyncOpening.IsFaulted)
// Extract the exception from the task, and throw it
// NOTE: adapt it to your case. In mine, I'm interested in the inner exception,
// but you can check the exception itself, for example to see if it was a timeout,
// if you specified it in the call to the async function that returns the Task
var ex = asyncOpening?.Exception?.InnerExceptions?[0];
if (ex != null) throw ex;
else
Log.Verbose("Connection to database Database on server Server", database, server);
break;
如果你也需要超时,你可以在调用OpenAsync
,或者你的异步函数中包含它,然后检查异步操作的结果是否因为超时而被取消:检查Task的状态完成后,您可以在代码注释中的 NOTE 中看到。
【讨论】:
【参考方案5】:Stephen 的 Cleary 解决方案看起来很完美。微软提供similar one。
因为我还没有看到取消逻辑的示例。
这里是:
public static class WaitHandleExtensions
public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
if (waitHandle == null)
throw new ArgumentNullException(nameof(waitHandle));
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;
RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
(_, timedOut) =>
if (timedOut)
tcs.TrySetCanceled();
else
tcs.TrySetResult(true);
,
null, timeout, true);
Task<bool> task = tcs.Task;
_ = task.ContinueWith(_ =>
rwh.Unregister(null);
return ctr.Unregister();
, CancellationToken.None);
return task;
【讨论】:
以上是关于将 ManualResetEvent 包装为等待任务的主要内容,如果未能解决你的问题,请参考以下文章