如何强制 IAsyncEnumerable 尊重 CancellationToken
Posted
技术标签:
【中文标题】如何强制 IAsyncEnumerable 尊重 CancellationToken【英文标题】:How to force an IAsyncEnumerable to respect a CancellationToken 【发布时间】:2020-02-02 16:42:40 【问题描述】:编辑:这个问题的要求已经改变。请参阅下面的更新部分。
我有一个异步迭代器方法,它产生一个IAsyncEnumerable<int>
(数字流),每 200 毫秒一个数字。此方法的调用者使用流,但希望在 1000 毫秒后停止枚举。所以使用了CancellationTokenSource
,并且令牌被传递为
WithCancellation
扩展方法的参数。但是令牌不受尊重。枚举一直持续到所有数字都被消耗完:
static async IAsyncEnumerable<int> GetSequence()
for (int i = 1; i <= 10; i++)
await Task.Delay(200);
yield return i;
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
Console.WriteLine($"DateTime.Now:HH:mm:ss.fff > i");
输出:
12:55:17.506 > 1 12:55:17.739 > 2 12:55:17.941 > 3 12:55:18.155 > 4 12:55:18.367 > 5 12:55:18.570 > 6 12:55:18.772 > 7 12:55:18.973 > 8 12:55:19.174 > 9 12:55:19.376 > 10
预期的输出是TaskCanceledException
出现在数字 5 之后。看来我误解了 WithCancellation
实际在做什么。该方法只是将提供的令牌传递给迭代器方法,如果该方法接受一个。否则,就像我的示例中的 GetSequence()
方法一样,令牌将被忽略。我想我的解决方案是手动询问枚举体内的令牌:
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
cts.Token.ThrowIfCancellationRequested();
Console.WriteLine($"DateTime.Now:HH:mm:ss.fff > i");
这很简单而且效果很好。但无论如何,我想知道是否有可能创建一个扩展方法来完成我期望WithCancellation
做的事情,将令牌烘焙到随后的枚举中。这是所需方法的签名:
public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
// Is it possible?
更新:当我问这个问题时,我似乎对整个取消概念的目的有一个错误的理解。我的印象是取消是为了在MoveNextAsync
的等待之后打破循环,而真正的目的是取消等待本身。在我的简单示例中,等待仅持续 200 毫秒,但在现实世界的示例中,等待可能更长,甚至是无限的。意识到这一点后,我现在的问题几乎没有价值,我必须要么删除它并打开一个具有相同标题的新问题,要么更改现有问题的要求。这两种选择都不好。
我决定选择第二个选项。因此,我不接受当前接受的答案,并且我正在寻求一种新的解决方案,以解决以立即生效的方式执行取消的更困难的问题。换句话说,取消令牌应该会导致异步枚举在几毫秒内完成。让我们举一个实际的例子来区分合意和不合意的行为:
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > i");
catch (OperationCanceledException)
Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > Canceled");
输出(理想):
0:00.242 > 1 0:00.467 > 2 0:00.500 > 取消
输出(不良):
0:00.242 > 1 0:00.467 > 2 0:00.707 > 取消
GetSequence
与初始示例中的方法相同,每 200 毫秒传输一个数字。此方法不支持取消,前提是我们无法更改。 WithEnforcedCancellation
是解决此问题所需的扩展方法。
【问题讨论】:
如果代码的编写方式不允许提前中止,则不能强制提前中止。好吧,你可以,但你真的不应该。 @LasseVågsætherKarlsen 这就像在说你不应该尽早退出循环。这是一个非常强烈的主张! 情况并不相似——中断同步循环总是安全的,但仅在迭代之间“取消”异步枚举意味着我们可能会增加相当大的开销和延迟(@987654341 不是问题@,但对于实际工作来说绝对是一个问题)。这种情况并不像一般异步工作那样可怕(我们可能不得不接受工作根本没有被取消并且仍在后台进行,尽管被忽略了),因为异步枚举隐含地包括处理资源,但仍然不是最佳的.将其与Task.Delay(10000)
进行比较。
@JeroenMostert 打破同步循环是安全的,因为编译器生成的迭代器 are disposing properly all disposable resources,编译器生成的异步迭代器也是如此。当你在await foreach
内部中断时意味着你在前一个MoveNextAsync 完成后中断,此时没有什么特别的事情发生。
@JeroenMostert 关于忽略后台工作的情况,我提出了一个相关问题here。我得到的反馈是,除了打破循环之外,我还应该将责任转移给调用者以提供额外的取消通知。
【参考方案1】:
IAsyncEnumerable
明确地为这种机制提供了EnumeratorCancellation
属性:
static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default)
for (int i = 1; i <= 10; i++)
ct.ThrowIfCancellationRequested();
await Task.Delay(200); // or `Task.Delay(200, ct)` if this wasn't an example
yield return i;
事实上,如果你给方法一个CancellationToken
参数,但不添加属性,编译器就会发出警告。
请注意,传递给.WithCancellation
的令牌将覆盖传递给该方法的任何本地令牌。 specs 有这方面的详细信息。
当然,这仍然只有在枚举实际接受CancellationToken
时才有效——但取消只有在合作完成时才真正有效的事实适用于任何async
工作。 Yeldar's answer 有利于将某种取消措施“强制”到不支持它的可枚举中,但首选的解决方案应该是修改枚举以支持自身取消——编译器会尽一切努力帮助你。
【讨论】:
感谢 Jeroen 的回答!你提供的信息很重要,尽管我已经知道了。我的问题是关于非合作取消的情况,基本上是从消耗异步枚举的循环中中断,但重要的是您指出了两种模式之间的区别。 顺便说一句,根据我的测试,proposal specs 中提供的信息不准确。如果传递了两个令牌,一个直接传递,一个通过WithCancellation
,两者都将受到尊重。也许他们同时改变了主意。
由于异步流在 C# 8 中是新的,我发现很难想出一个场景,你不能编写一个枚举来支持协作取消——没有遗留代码但我们必须修复!可以想象,您可以有一个包含现有同步枚举的异步枚举(尽管这样的想法很糟糕),但即便如此,您也可以(并且可以说应该)在其中插入取消逻辑。
据我了解,取消对于允许取消两个循环之间的等待很有用。因此,在您的示例中,您可能应该取消注释 Task.Delay(200, ct)
代码,因为这是等待发生的地方。总的来说,我同意每个应该成为 IAsyncEnumerable
的 IEnumerable
,它也应该支持取消。 :-)
是的,如果Task.Delay
代表真正的(可取消的)工作,我们将简单地传递令牌,我们根本不需要调用ThrowIfCancellationRequested
(假设循环的其余部分没有做任何有趣的事情要么)。【参考方案2】:
你可以像这样将你的逻辑提取到一个扩展方法中:
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
await foreach (var item in source)
cancellationToken.ThrowIfCancellationRequested();
yield return item;
【讨论】:
就是这么简单!我想知道为什么它没有通过我的脑海。我太着迷于创建异步可枚举包装器的想法,而错过了简单的解决方案。 :-) @TheodorZoulias 是的,确实,已修复 :) 抱歉 Yeldar 不接受您的回答。我已经彻底改变了这个问题的要求。请参阅更新部分。【参考方案3】:我认为重申您不应该这样做很重要。让异步方法支持取消令牌总是更好的,然后取消就像您期望的那样立即。如果这是不可能的,我仍然建议在尝试此答案之前尝试其他答案之一。
话虽如此,如果您无法为异步方法添加取消支持,并且您确实需要立即终止 foreach,那么您可以破解您的绕过它。
一个技巧是使用带有两个参数的Task.WhenAny
:
-
你从
IAsyncEnumerator.MoveNextAsync()
得到的任务
另一个确实支持取消的任务
这是简短的版本
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
yield return enumerator.Current;
为了完整起见,带有ConfigureAwait(false)
和DisposeAsync()
的长版本应该可以在本地运行。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class AsyncStreamHelper
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
Task<bool> moveNext = null;
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
try
while (
await (
await Task.WhenAny(
(
moveNext = enumerator.MoveNextAsync().AsTask()
),
untilCanceled
).ConfigureAwait(false)
)
)
yield return enumerator.Current;
finally
if (moveNext != null && !moveNext.IsCompleted)
// Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!
moveNext.ContinueWith(async _ =>
await enumerator.DisposeAsync();
, TaskScheduler.Default);
#pragma warning restore 4014
else if (enumerator != null)
await enumerator.DisposeAsync();
private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
// This is just one possible implementation... feel free to swap out for something else
return new Task<bool>(() => true, cancellationToken);
public class Program
public static async Task Main()
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > i");
catch (OperationCanceledException)
Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > Canceled");
static async IAsyncEnumerable<int> GetSequence()
for (int i = 1; i <= 10; i++)
await Task.Delay(200);
yield return i;
注意事项
枚举器返回一个 ValueTask 以提高性能(使用比常规任务更少的分配),但 ValueTask 不能与 Task.WhenAny()
一起使用,因此使用 AsTask()
会通过引入分配开销而降低性能。
只有在最近的MoveNextAsync()
完成后才能释放枚举器。当请求取消时,任务更有可能仍在运行。这就是为什么我在后续任务中添加了另一个对 DisposeAsync
的调用。
在这种情况下,当WithEnforcedCancellation()
方法退出时,枚举器还没有被释放。它将在枚举被放弃后的一段时间内被处理。如果DisposeAsync()
抛出异常,异常将丢失。它不能冒泡调用堆栈,因为没有调用堆栈。
【讨论】:
我用你的改进更新了我的答案,除了处理由于某种原因抛出NotSupportedException
的枚举器。我真的不明白为什么。
^是的,这个,见:github.com/dotnet/runtime/issues/51176#issuecomment-818866190
我在最近待处理的MoveNextAsync
任务的后续任务中添加了对DisposeAsync
的调用,我还在警告中添加了一个新注释。
现在看来各方面的实现都很完美。谢谢史蒂文!
我发现还有一个改进,可以在最近的任务已经完成时同步释放枚举器。我还添加了关于异常处理(或缺少异常处理)的注释。以上是关于如何强制 IAsyncEnumerable 尊重 CancellationToken的主要内容,如果未能解决你的问题,请参考以下文章
如何强制 CSS url 尊重 webpack 的 publicPath 设置?
如何强制响应式 iframe 目标尊重 iframe 的视口并做出响应,就好像它被直接访问一样?