通过 IEnumerable 和 TPL 数据流传输数据
Posted
技术标签:
【中文标题】通过 IEnumerable 和 TPL 数据流传输数据【英文标题】:Streaming data via IEnumerable & TPL Dataflow 【发布时间】:2020-02-21 22:29:01 【问题描述】:我从一个非常慢的上游 API 获取项目。我尝试通过使用 TPL 数据流创建多个连接并将它们组合在一起来加快速度,就像这样;
class Stuff
int Id get;
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 5
;
var processor = new ActionBlock<int>(async id =>
bagOfStuff.Add(await GetStuffById(id));
, options);
foreach (int id in ids)
processor.Post(id);
processor.Complete();
await processor.Completion;
return bagOfStuff.ToArray();
问题是我必须等到我查询完Stuff
的整个集合之后才能将它返回给调用者。我更喜欢的是,每当多个并行查询中的任何一个返回一个项目时,我都会以yield return
方式返回该项目。因此我不需要返回sync Task<IEnumerable<Stuff>>
,我可以只返回IEnumerable<Stuff>
,只要有任何项目返回,调用者就会推进迭代。
我试过这样做;
IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
var options = new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 5
;
var processor = new ActionBlock<int>(async id =>
yield return await GetStuffById(id);
, options);
foreach (int id in ids)
processor.Post(id);
processor.Complete();
processor.Completion.Wait();
yield break;
但我得到一个错误
yield 语句不能在匿名方法或 lambda 表达式中使用
如何重组我的代码?
【问题讨论】:
【参考方案1】:您可以返回IEnumerable
,但要这样做,您必须阻止当前线程。您需要一个 TransformBlock
来处理 id,以及一个 feeder-task 将异步提供带有 id 的 TransformBlock
。最后当前线程会进入一个阻塞循环,等待产出的东西产出:
static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
using var completionCTS = new CancellationTokenSource();
var processor = new TransformBlock<int, Stuff>(async id =>
return await GetStuffById(id);
, new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 5,
BoundedCapacity = 50, // Avoid buffering millions of ids
CancellationToken = completionCTS.Token
);
var feederTask = Task.Run(async () =>
try
foreach (int id in ids)
if (!await processor.SendAsync(id)) break;
finally processor.Complete();
);
try
while (processor.OutputAvailableAsync().Result)
while (processor.TryReceive(out var stuff))
yield return stuff;
finally // This runs when the caller exits the foreach loop
completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
不需要ConcurrentBag
,因为TransformBlock
有一个内部输出缓冲区。棘手的部分是处理调用者将通过提前中断或被异常阻碍而放弃枚举IEnumerable<Stuff>
的情况。在这种情况下,您不希望 feeder-task 一直使用 id 抽取 IEnumerable<int>
直到结束。幸运的是there is a solution。在 try/finally 块中包含 yielding 循环允许接收此事件的通知,以便可以及时终止 feeder-task。
另一种实现可以通过在一个循环中组合泵送 id、馈送块和产生东西来消除对馈送任务的需要。在这种情况下,您会希望在抽水和产出之间有一个滞后。要实现它,MoreLinq 的Lag
(或Lead
)扩展方法可能很方便。
更新:这是一个不同的实现,它在同一个循环中枚举和产生。为了达到期望的滞后,源可枚举用一些虚拟元素右填充,数量与并发程度相等。
此实现接受泛型类型,而不是 int
和 Stuff
。
public static IEnumerable<TResult> Transform<TSource, TResult>(
IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
int degreeOfConcurrency)
var processor = new TransformBlock<TSource, TResult>(async item =>
return await taskFactory(item);
, new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = degreeOfConcurrency
);
var paddedSource = source.Select(item => (item, true))
.Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
int index = -1;
bool completed = false;
foreach (var (item, hasValue) in paddedSource)
index++;
if (hasValue) processor.Post(item);
else if (!completed) processor.Complete(); completed = true;
if (index >= degreeOfConcurrency)
if (!processor.OutputAvailableAsync().Result) break; // Blocking call
if (!processor.TryReceive(out var result))
throw new InvalidOperationException(); // Should never happen
yield return result;
processor.Completion.Wait();
使用示例:
IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);
可以简单地修改这两个实现以返回 IAsyncEnumerable
而不是 IEnumerable
,以避免阻塞调用线程。
【讨论】:
我添加了一个替代实现。【参考方案2】:根据您的具体用例,您可能有几种不同的方法可以处理此问题。但是要处理通过 TPL 数据流的项目,您需要将源块更改为 TransformBlock<,>
并将项目流到另一个块以处理您的项目。请注意,现在您可以摆脱收集ConcurrentBag
并确保将EnsureOrdered
设置为false
,如果您不关心收到物品的顺序。还链接块并传播完成以确保您的管道完成一旦所有项目都被检索并随后处理。
class Stuff
int Id get;
public class GetStuff
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task GetLotsOfStuff(IEnumerable<int> ids)
//var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 5,
EnsureOrdered = false
;
var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);
var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());
processor.LinkTo(handler, new DataflowLinkOptions() PropagateCompletion = true );
foreach (int id in ids)
processor.Post(id);
processor.Complete();
await handler.Completion;
其他选项可以使您的方法成为从TransformBlock
流出的可观察流,或使用IAsyncEnumerable
到yield return
和异步get 方法。
【讨论】:
对!所以我可以有效地将处理程序的委托作为参数添加到GetLotsOfStuff
。由于调用者知道它想对每个项目做什么,它可以只传递委托。
是的,如果调用者知道如何处理每个项目,那可以工作以上是关于通过 IEnumerable 和 TPL 数据流传输数据的主要内容,如果未能解决你的问题,请参考以下文章
将生成 IEnumerable<T> 的 TransformBlock 链接到接收 T 的块