如何强制 IAsyncEnumerable 尊重 CancellationToken

Posted

技术标签:

【中文标题】如何强制 IAsyncEnumerable 尊重 CancellationToken【英文标题】:How to force an IAsyncEnumerable to respect a CancellationToken 【发布时间】:2020-02-02 16:42:40 【问题描述】:

编辑:这个问题的要求已经改变。请参阅下面的更新部分。

我有一个异步迭代器方法,它产生一个IAsyncEnumerable<int>(数字流),每 200 毫秒一个数字。此方法的调用者使用流,但希望在 1000 毫秒后停止枚举。所以使用了CancellationTokenSource,并且令牌被传递为 WithCancellation 扩展方法的参数。但是令牌不受尊重。枚举一直持续到所有数字都被消耗完:

static async IAsyncEnumerable<int> GetSequence()

    for (int i = 1; i <= 10; i++)
    
        await Task.Delay(200);
        yield return i;
    


var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))

    Console.WriteLine($"DateTime.Now:HH:mm:ss.fff > i");

输出:

12:55:17.506 > 1 12:55:17.739 > 2 12:55:17.941 > 3 12:55:18.155 > 4 12:55:18.367 > 5 12:55:18.570 > 6 12:55:18.772 > 7 12:55:18.973 > 8 12:55:19.174 > 9 12:55:19.376 > 10

预期的输出是TaskCanceledException 出现在数字 5 之后。看来我误解了 WithCancellation 实际在做什么。该方法只是将提供的令牌传递给迭代器方法,如果该方法接受一个。否则,就像我的示例中的 GetSequence() 方法一样,令牌将被忽略。我想我的解决方案是手动询问枚举体内的令牌:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())

    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"DateTime.Now:HH:mm:ss.fff > i");

这很简单而且效果很好。但无论如何,我想知道是否有可能创建一个扩展方法来完成我期望WithCancellation 做的事情,将令牌烘焙到随后的枚举中。这是所需方法的签名:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)

    // Is it possible?


更新:当我问这个问题时,我似乎对整个取消概念的目的有一个错误的理解。我的印象是取消是为了在MoveNextAsync 的等待之后打破循环,而真正的目的是取消等待本身。在我的简单示例中,等待仅持续 200 毫秒,但在现实世界的示例中,等待可能更长,甚至是无限的。意识到这一点后,我现在的问题几乎没有价值,我必须要么删除它并打开一个具有相同标题的新问题,要么更改现有问题的要求。这两种选择都不好。

我决定选择第二个选项。因此,我不接受当前接受的答案,并且我正在寻求一种新的解决方案,以解决以立即生效的方式执行取消的更困难的问题。换句话说,取消令牌应该会导致异步枚举在几毫秒内完成。让我们举一个实际的例子来区分合意和不合意的行为:

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try

    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    
        Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > i");
    

catch (OperationCanceledException)

    Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > Canceled");

输出(理想):

0:00.242 > 1 0:00.467 > 2 0:00.500 > 取消

输出(不良):

0:00.242 > 1 0:00.467 > 2 0:00.707 > 取消

GetSequence 与初始示例中的方法相同,每 200 毫秒传输一个数字。此方法不支持取消,前提是我们无法更改。 WithEnforcedCancellation 是解决此问题所需的扩展方法。

【问题讨论】:

如果代码的编写方式不允许提前中止,则不能强制提前中止。好吧,你可以,但你真的不应该 @LasseVågsætherKarlsen 这就像在说你不应该尽早退出循环。这是一个非常强烈的主张! 情况并不相似——中断同步循环总是安全的,但仅在迭代之间“取消”异步枚举意味着我们可能会增加相当大的开销和延迟(@987654341 不是问题@,但对于实际工作来说绝对是一个问题)。这种情况并不像一般异步工作那样可怕(我们可能不得不接受工作根本没有被取消并且仍在后台进行,尽管被忽略了),因为异步枚举隐含地包括处理资源,但仍然不是最佳的.将其与 Task.Delay(10000) 进行比较。 @JeroenMostert 打破同步循环是安全的,因为编译器生成的迭代器 are disposing properly all disposable resources,编译器生成的异步迭代器也是如此。当你在await foreach 内部中断时意味着你在前一个MoveNextAsync 完成后中断,此时没有什么特别的事情发生。 @JeroenMostert 关于忽略后台工作的情况,我提出了一个相关问题here。我得到的反馈是,除了打破循环之外,我还应该将责任转移给调用者以提供额外的取消通知。 【参考方案1】:

IAsyncEnumerable 明确地为这种机制提供了EnumeratorCancellation 属性:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) 
    for (int i = 1; i <= 10; i++) 
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    

事实上,如果你给方法一个CancellationToken参数,但不添加属性,编译器就会发出警告。

请注意,传递给.WithCancellation 的令牌将覆盖传递给该方法的任何本地令牌。 specs 有这方面的详细信息。

当然,这仍然只有在枚举实际接受CancellationToken 时才有效——但取消只有在合作完成时才真正有效的事实适用于任何async 工作。 Yeldar's answer 有利于将某种取消措施“强制”到不支持它的可枚举中,但首选的解决方案应该是修改枚举以支持自身取消——编译器会尽一切努力帮助你。

【讨论】:

感谢 Jeroen 的回答!你提供的信息很重要,尽管我已经知道了。我的问题是关于非合作取消的情况,基本上是从消耗异步枚举的循环中中断,但重要的是您指出了两种模式之间的区别。 顺便说一句,根据我的测试,proposal specs 中提供的信息不准确。如果传递了两个令牌,一个直接传递,一个通过WithCancellation,两者都将受到尊重。也许他们同时改变了主意。 由于异步流在 C# 8 中是新的,我发现很难想出一个场景,你不能编写一个枚举来支持协作取消——没有遗留代码但我们必须修复!可以想象,您可以有一个包含现有同步枚举的异步枚举(尽管这样的想法很糟糕),但即便如此,您也可以(并且可以说应该)在其中插入取消逻辑。 据我了解,取消对于允许取消两个循环之间的等待很有用。因此,在您的示例中,您可能应该取消注释 Task.Delay(200, ct) 代码,因为这是等待发生的地方。总的来说,我同意每个应该成为 IAsyncEnumerableIEnumerable,它也应该支持取消。 :-) 是的,如果Task.Delay 代表真正的(可取消的)工作,我们将简单地传递令牌,我们根本不需要调用ThrowIfCancellationRequested(假设循环的其余部分没有做任何有趣的事情要么)。【参考方案2】:

你可以像这样将你的逻辑提取到一个扩展方法中:

public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)

    if (source == null)
        throw new ArgumentNullException(nameof(source));

    cancellationToken.ThrowIfCancellationRequested();

    await foreach (var item in source)
    
        cancellationToken.ThrowIfCancellationRequested();
        yield return item;
    

【讨论】:

就是这么简单!我想知道为什么它没有通过我的脑海。我太着迷于创建异步可枚举包装器的想法,而错过了简单的解决方案。 :-) @TheodorZoulias 是的,确实,已修复 :) 抱歉 Yeldar 不接受您的回答。我已经彻底改变了这个问题的要求。请参阅更新部分。【参考方案3】:

我认为重申您应该这样做很重要。让异步方法支持取消令牌总是更好的,然后取消就像您期望的那样立即。如果这是不可能的,我仍然建议在尝试此答案之前尝试其他答案之一。

话虽如此,如果您无法为异步方法添加取消支持,并且您确实需要立即终止 foreach,那么您可以破解您的绕过它。

一个技巧是使用带有两个参数的Task.WhenAny

    你从IAsyncEnumerator.MoveNextAsync()得到的任务 另一个确实支持取消的任务

这是简短的版本

// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);

// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))

    yield return enumerator.Current;

为了完整起见,带有ConfigureAwait(false)DisposeAsync() 的长版本应该可以在本地运行。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncStreamHelper

    public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
    
        if (source == null)
            throw new ArgumentNullException(nameof(source));
        cancellationToken.ThrowIfCancellationRequested();

        // Start the 'await foreach' without the new syntax
        // because we need access to the ValueTask returned by MoveNextAsync()
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        Task<bool> moveNext = null;

        // Combine MoveNextAsync() with another Task that can be awaited indefinitely,
        // until it throws OperationCanceledException
        var untilCanceled = UntilCanceled(cancellationToken);
        try
        
            while (
                await (
                    await Task.WhenAny(
                        (
                            moveNext = enumerator.MoveNextAsync().AsTask()
                        ),
                        untilCanceled
                    ).ConfigureAwait(false)
                )
            )
            
                yield return enumerator.Current;
            
        
        finally
        
            if (moveNext != null && !moveNext.IsCompleted)
            
                // Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!

                moveNext.ContinueWith(async _ =>
                
                    await enumerator.DisposeAsync();
                , TaskScheduler.Default);
#pragma warning restore 4014
            
            else if (enumerator != null)
            
                await enumerator.DisposeAsync();
            
        
    

    private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
    
        // This is just one possible implementation... feel free to swap out for something else
        return new Task<bool>(() => true, cancellationToken);
    


public class Program

    public static async Task Main()
    
        var cts = new CancellationTokenSource(500);
        var stopwatch = Stopwatch.StartNew();
        try
        
            await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
            
                Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > i");
            
        
        catch (OperationCanceledException)
        
            Console.WriteLine($"stopwatch.Elapsed:m':'ss'.'fff > Canceled");
        
    

    static async IAsyncEnumerable<int> GetSequence()
    
        for (int i = 1; i <= 10; i++)
        
            await Task.Delay(200);
            yield return i;
        
    

注意事项

枚举器返回一个 ValueTask 以提高性能(使用比常规任务更少的分配),但 ValueTask 不能与 Task.WhenAny() 一起使用,因此使用 AsTask() 会通过引入分配开销而降低性能。

只有在最近的MoveNextAsync() 完成后才能释放枚举器。当请求取消时,任务更有可能仍在运行。这就是为什么我在后续任务中添加了另一个对 DisposeAsync 的调用。

在这种情况下,当WithEnforcedCancellation() 方法退出时,枚举器还没有被释放。它将在枚举被放弃后的一段时间内被处理。如果DisposeAsync() 抛出异常,异常将丢失。它不能冒泡调用堆栈,因为没有调用堆栈。

【讨论】:

我用你的改进更新了我的答案,除了处理由于某种原因抛出 NotSupportedException 的枚举器。我真的不明白为什么。 ^是的,这个,见:github.com/dotnet/runtime/issues/51176#issuecomment-818866190 我在最近待处理的MoveNextAsync 任务的后续任务中添加了对DisposeAsync 的调用,我还在警告中添加了一个新注释。 现在看来各方面的实现都很完美。谢谢史蒂文! 我发现还有一个改进,可以在最近的任务已经完成时同步释放枚举器。我还添加了关于异常处理(或缺少异常处理)的注释。

以上是关于如何强制 IAsyncEnumerable 尊重 CancellationToken的主要内容,如果未能解决你的问题,请参考以下文章

如何强制 nrwl nx 尊重标签更新?

如何强制 CSS url 尊重 webpack 的 publicPath 设置?

如何强制响应式 iframe 目标尊重 iframe 的视口并做出响应,就好像它被直接访问一样?

如何使用 SqlDataReader 返回和使用 IAsyncEnumerable

迭代时如何突破 IAsyncEnumerable?

如何在存储库类中使用 IAsyncEnumerable