在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable

Posted

技术标签:

【中文标题】在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable【英文标题】:Iterating an IAsyncEnumerable in a function returning an IAsyncEnumerable with cancellation 【发布时间】:2020-03-04 13:43:27 【问题描述】:

正如标题所说,我必须遵循以下功能:

public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)

    await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
        .AsAsyncEnumerable()
        .WithCancellation(cancellationToken)
        .ConfigureAwait(false))
    
        yield return job;
    

我很难理解取消令牌的去向,并且感觉我在太多地方使用它。

当您解构所有花哨的异步内容时,这里实际发生了什么?有没有更好的方法来编写这个函数?

【问题讨论】:

AsAsyncEnumerable() 已经返回IAsyncEnumerable&lt;Job&gt;。您不需要其余代码,只需返回即可,即return context.Jobs.Where(job =&gt; job.Pipeline.Id == pipelineId) .AsAsyncEnumerable() @PanagiotisKanavos 不过它不支持CancellationTokenWithCancellation 的结果不能转换为IAsyncEnumerable 这些调用只在调用者的站点上有意义。当您尝试迭代 IAsyncEnumerable 时,您必须再次添加它们 此外,在这种情况下,两者都没有太大影响。即使取消迭代也无法取消查询,而ConfigureAwait(false) 将在何处恢复执行的决定传递给调用者。 【参考方案1】:

对于初学者,这个方法可以简化为:

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)

    return context.Jobs
                  .Where(job => job.Pipeline.Id == pipelineId)
                  .AsAsyncEnumerable();

甚至

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
    => context.Jobs
              .Where(job => job.Pipeline.Id == pipelineId)
              .AsAsyncEnumerable();

该方法对job 没有任何作用,因此不需要对其进行迭代。

取消

如果方法实际使用了job,应该在哪里使用取消令牌?

让我们稍微清理一下方法。相当于:

public async IAsyncEnumerable<Job> GetByPipeline(
      int pipelineId, 
      [EnumeratorCancellation] CancellationToken ct = default)

    //Just a query, doesn't execute anything
    var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);

    //Executes the query and returns the *results* as soon as they arrive in an async stream
    var jobStream=query.AsAsyncEnumerable();

    //Process the results from the async stream as they arrive
    await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
    
        //Does *that* need cancelling?
        DoSometingExpensive(job);
    

IQueryable query 不运行任何东西,它代表查询。它不需要取消。

AsAsyncEnumerable()AsEnumerable()ToList()执行查询并返回一些结果。 ToList() 等消耗所有结果,而 As...Enumerable() 方法仅在请求时产生结果。查询无法取消,As_Enumerable() 方法不会返回任何内容,除非被要求,因此它们不需要取消。

await foreach 将遍历整个异步流,因此如果我们希望能够中止它,我们确实需要传递取消令牌。

最后,DoSometingExpensive(job); 需要取消吗?如果花费太长时间,我们是否希望能够摆脱它?或者我们可以等到它完成后再退出循环吗?如果它需要取消,它也需要 CancellationToken。

配置等待

最后,ConfigureAwait(false) 不参与取消,可能根本不需要。没有它,在每次await 执行后返回到原始同步上下文。在桌面应用程序中,这意味着 UI 线程。这就是允许我们在异步事件处理程序中修改 UI 的原因。

如果GetByPipeline 在桌面应用程序上运行并想要修改 UI,则必须删除 ConfugureAwait

await foreach (var job in jobStream.WithCancellation(ct))

        //Update the UI
        toolStripProgressBar.Increment(1);
        toolStripStatusLabel.Text=job.Name;
        //Do the actual job
        DoSometingExpensive(job);

使用ConfigureAwait(false),在线程池线程上继续执行,我们无法触摸 UI。

库代码不应影响执行的恢复方式,因此大多数库使用ConfigureAwait(false) 并将最终决定权留给 UI 开发人员。

如果GetByPipeline 是库方法,请使用ConfigureAwait(false)

【讨论】:

【参考方案2】:

想象一下,在 Entity Framework 深处的某个地方是方法 GetJobs,它从数据库中检索 Job 对象:

private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)

    while (await dataReader.ReadAsync(cancellationToken))
    
        yield return new Job()
        
            Id = (int)dataReader["Id"],
            Data = (byte[])dataReader["Data"]
        ;
    

现在假设Data 属性包含一个巨大的字节数组,其中包含与Job 相关的数据。检索每个 Job 的数组可能需要一些不小的时间。在这种情况下,打破循环次迭代之间是不够的,因为在调用Cancel 方法和提升OperationCanceledException 之间会有明显的延迟。这就是为什么DbDataReader.ReadAsync 方法需要CancellationToken,以便可以立即取消查询。

现在的挑战是如何将客户端代码传递的CancellationToken 传递给GetJobs 方法,此时像context.Jobs 这样的属性正在沿途。解决方案是 WithCancellation 扩展方法,它存储令牌并将其更深地传递给接受用 EnumeratorCancellation 属性修饰的参数的方法。

因此,在您的情况下,您已正确完成所有操作。您在 IAsyncEnumerable 返回方法中包含了 cancellationToken 参数,这是推荐的做法。这样后续WithCancellation 链接到您的GetByPipeline 方法将不会被浪费。然后,您在方法中将WithCancellation 链接到AsAsyncEnumerable 之后,这也是正确的。否则,CancellationToken 将无法到达其最终目的地,GetJobs 方法。

【讨论】:

以上是关于在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable的主要内容,如果未能解决你的问题,请参考以下文章

在 C#8 IAsyncEnumerable<T> 中并行化收益返回

如何在实际迭代发生之前验证 IAsyncEnumerable 返回方法的参数?

如何使用 SqlDataReader 返回和使用 IAsyncEnumerable

如何在存储库类中使用 IAsyncEnumerable

为啥使用 IAsyncEnumerable 比返回 async/await Task<T> 慢?

gRPC 服务器流是不是可以将流返回到 Blazor Wasm 而不是 IAsyncEnumerable<T>?