在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable
Posted
技术标签:
【中文标题】在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable【英文标题】:Iterating an IAsyncEnumerable in a function returning an IAsyncEnumerable with cancellation 【发布时间】:2020-03-04 13:43:27 【问题描述】:正如标题所说,我必须遵循以下功能:
public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
.AsAsyncEnumerable()
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
yield return job;
我很难理解取消令牌的去向,并且感觉我在太多地方使用它。
当您解构所有花哨的异步内容时,这里实际发生了什么?有没有更好的方法来编写这个函数?
【问题讨论】:
AsAsyncEnumerable()
已经返回IAsyncEnumerable<Job>
。您不需要其余代码,只需返回即可,即return context.Jobs.Where(job => job.Pipeline.Id == pipelineId) .AsAsyncEnumerable()
@PanagiotisKanavos 不过它不支持CancellationToken
,WithCancellation
的结果不能转换为IAsyncEnumerable
这些调用只在调用者的站点上有意义。当您尝试迭代 IAsyncEnumerable 时,您必须再次添加它们
此外,在这种情况下,两者都没有太大影响。即使取消迭代也无法取消查询,而ConfigureAwait(false)
将在何处恢复执行的决定传递给调用者。
【参考方案1】:
对于初学者,这个方法可以简化为:
public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
return context.Jobs
.Where(job => job.Pipeline.Id == pipelineId)
.AsAsyncEnumerable();
甚至
public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
=> context.Jobs
.Where(job => job.Pipeline.Id == pipelineId)
.AsAsyncEnumerable();
该方法对job
没有任何作用,因此不需要对其进行迭代。
取消
如果方法实际使用了job
,应该在哪里使用取消令牌?
让我们稍微清理一下方法。相当于:
public async IAsyncEnumerable<Job> GetByPipeline(
int pipelineId,
[EnumeratorCancellation] CancellationToken ct = default)
//Just a query, doesn't execute anything
var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);
//Executes the query and returns the *results* as soon as they arrive in an async stream
var jobStream=query.AsAsyncEnumerable();
//Process the results from the async stream as they arrive
await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
//Does *that* need cancelling?
DoSometingExpensive(job);
IQueryable query
不运行任何东西,它代表查询。它不需要取消。
AsAsyncEnumerable()
、AsEnumerable()
、ToList()
等执行查询并返回一些结果。 ToList()
等消耗所有结果,而 As...Enumerable()
方法仅在请求时产生结果。查询无法取消,As_Enumerable()
方法不会返回任何内容,除非被要求,因此它们不需要取消。
await foreach
将遍历整个异步流,因此如果我们希望能够中止它,我们确实需要传递取消令牌。
最后,DoSometingExpensive(job);
需要取消吗?如果花费太长时间,我们是否希望能够摆脱它?或者我们可以等到它完成后再退出循环吗?如果它需要取消,它也需要 CancellationToken。
配置等待
最后,ConfigureAwait(false)
不参与取消,可能根本不需要。没有它,在每次await
执行后返回到原始同步上下文。在桌面应用程序中,这意味着 UI 线程。这就是允许我们在异步事件处理程序中修改 UI 的原因。
如果GetByPipeline
在桌面应用程序上运行并想要修改 UI,则必须删除 ConfugureAwait
:
await foreach (var job in jobStream.WithCancellation(ct))
//Update the UI
toolStripProgressBar.Increment(1);
toolStripStatusLabel.Text=job.Name;
//Do the actual job
DoSometingExpensive(job);
使用ConfigureAwait(false)
,在线程池线程上继续执行,我们无法触摸 UI。
库代码不应影响执行的恢复方式,因此大多数库使用ConfigureAwait(false)
并将最终决定权留给 UI 开发人员。
如果GetByPipeline
是库方法,请使用ConfigureAwait(false)
。
【讨论】:
【参考方案2】:想象一下,在 Entity Framework 深处的某个地方是方法 GetJobs
,它从数据库中检索 Job
对象:
private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
while (await dataReader.ReadAsync(cancellationToken))
yield return new Job()
Id = (int)dataReader["Id"],
Data = (byte[])dataReader["Data"]
;
现在假设Data
属性包含一个巨大的字节数组,其中包含与Job
相关的数据。检索每个 Job
的数组可能需要一些不小的时间。在这种情况下,打破循环在次迭代之间是不够的,因为在调用Cancel
方法和提升OperationCanceledException
之间会有明显的延迟。这就是为什么DbDataReader.ReadAsync
方法需要CancellationToken
,以便可以立即取消查询。
现在的挑战是如何将客户端代码传递的CancellationToken
传递给GetJobs
方法,此时像context.Jobs
这样的属性正在沿途。解决方案是 WithCancellation
扩展方法,它存储令牌并将其更深地传递给接受用 EnumeratorCancellation
属性修饰的参数的方法。
因此,在您的情况下,您已正确完成所有操作。您在 IAsyncEnumerable
返回方法中包含了 cancellationToken
参数,这是推荐的做法。这样后续WithCancellation
链接到您的GetByPipeline
方法将不会被浪费。然后,您在方法中将WithCancellation
链接到AsAsyncEnumerable
之后,这也是正确的。否则,CancellationToken
将无法到达其最终目的地,GetJobs
方法。
【讨论】:
以上是关于在返回带有取消的 IAsyncEnumerable 的函数中迭代 IAsyncEnumerable的主要内容,如果未能解决你的问题,请参考以下文章
在 C#8 IAsyncEnumerable<T> 中并行化收益返回
如何在实际迭代发生之前验证 IAsyncEnumerable 返回方法的参数?
如何使用 SqlDataReader 返回和使用 IAsyncEnumerable