通过 IEnumerable 和 TPL 数据流传输数据

Posted

技术标签:

【中文标题】通过 IEnumerable 和 TPL 数据流传输数据【英文标题】:Streaming data via IEnumerable & TPL Dataflow 【发布时间】:2020-02-21 22:29:01 【问题描述】:

我从一个非常慢的上游 API 获取项目。我尝试通过使用 TPL 数据流创建多个连接并将它们组合在一起来加快速度,就像这样;

class Stuff

    int Id  get; 


async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)

    var bagOfStuff = new ConcurrentBag<Stuff>();

    var options = new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = 5
    ;

    var processor = new ActionBlock<int>(async id =>
    
        bagOfStuff.Add(await GetStuffById(id));
    , options);

    foreach (int id in ids)
    
        processor.Post(id);
    

    processor.Complete();
    await processor.Completion;

    return bagOfStuff.ToArray();

问题是我必须等到我查询完Stuff 的整个集合之后才能将它返回给调用者。我更喜欢的是,每当多个并行查询中的任何一个返回一个项目时,我都会以yield return 方式返回该项目。因此我不需要返回sync Task&lt;IEnumerable&lt;Stuff&gt;&gt;,我可以只返回IEnumerable&lt;Stuff&gt;,只要有任何项目返回,调用者就会推进迭代。

我试过这样做;

IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)

    var options = new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = 5
    ;

    var processor = new ActionBlock<int>(async id =>
    
        yield return await GetStuffById(id);
    , options);

    foreach (int id in ids)
    
        processor.Post(id);
    

    processor.Complete();
    processor.Completion.Wait();

    yield break;

但我得到一个错误

yield 语句不能在匿名方法或 lambda 表达式中使用

如何重组我的代码?

【问题讨论】:

【参考方案1】:

您可以返回IEnumerable,但要这样做,您必须阻止当前线程。您需要一个 TransformBlock 来处理 id,以及一个 feeder-task 将异步提供带有 id 的 TransformBlock。最后当前线程会进入一个阻塞循环,等待产出的东西产出:

static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)

    using var completionCTS = new CancellationTokenSource();

    var processor = new TransformBlock<int, Stuff>(async id =>
    
        return await GetStuffById(id);
    , new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 50, // Avoid buffering millions of ids
        CancellationToken = completionCTS.Token
    );

    var feederTask = Task.Run(async () =>
    
        try
        
            foreach (int id in ids)
                if (!await processor.SendAsync(id)) break;
        
        finally  processor.Complete(); 
    );

    try
    
        while (processor.OutputAvailableAsync().Result)
            while (processor.TryReceive(out var stuff))
                yield return stuff;
    
    finally // This runs when the caller exits the foreach loop
    
        completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
    

    Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions

不需要ConcurrentBag,因为TransformBlock 有一个内部输出缓冲区。棘手的部分是处理调用者将通过提前中断或被异常阻碍而放弃枚举IEnumerable&lt;Stuff&gt; 的情况。在这种情况下,您不希望 feeder-task 一直使用 id 抽取 IEnumerable&lt;int&gt; 直到结束。幸运的是there is a solution。在 try/finally 块中包含 yielding 循环允许接收此事件的通知,以便可以及时终止 feeder-task。

另一种实现可以通过在一个循环中组合泵送 id、馈送块和产生东西来消除对馈送任务的需要。在这种情况下,您会希望在抽水和产出之间有一个滞后。要实现它,MoreLinq 的Lag(或Lead)扩展方法可能很方便。


更新:这是一个不同的实现,它在同一个循环中枚举和产生。为了达到期望的滞后,源可枚举用一些虚拟元素右填充,数量与并发程度相等。

此实现接受泛型类型,而不是 intStuff

public static IEnumerable<TResult> Transform<TSource, TResult>(
    IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
    int degreeOfConcurrency)

    var processor = new TransformBlock<TSource, TResult>(async item =>
    
        return await taskFactory(item);
    , new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = degreeOfConcurrency
    );

    var paddedSource = source.Select(item => (item, true))
        .Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
    int index = -1;
    bool completed = false;
    foreach (var (item, hasValue) in paddedSource)
    
        index++;
        if (hasValue)  processor.Post(item); 
        else if (!completed)  processor.Complete(); completed = true; 
        if (index >= degreeOfConcurrency)
        
            if (!processor.OutputAvailableAsync().Result) break; // Blocking call
            if (!processor.TryReceive(out var result))
                throw new InvalidOperationException(); // Should never happen
            yield return result;
        
    
    processor.Completion.Wait();

使用示例:

IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);

可以简单地修改这两个实现以返回 IAsyncEnumerable 而不是 IEnumerable,以避免阻塞调用线程。

【讨论】:

我添加了一个替代实现。【参考方案2】:

根据您的具体用例,您可能有几种不同的方法可以处理此问题。但是要处理通过 TPL 数据流的项目,您需要将源块更改为 TransformBlock&lt;,&gt; 并将项目流到另一个块以处理您的项目。请注意,现在您可以摆脱收集ConcurrentBag 并确保将EnsureOrdered 设置为false,如果您不关心收到物品的顺序。还链接块并传播完成以确保您的管道完成一旦所有项目都被检索并随后处理。

class Stuff

    int Id  get; 


public class GetStuff

    async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

    async Task GetLotsOfStuff(IEnumerable<int> ids)
    
        //var bagOfStuff = new ConcurrentBag<Stuff>();

        var options = new ExecutionDataflowBlockOptions
        
            MaxDegreeOfParallelism = 5,
            EnsureOrdered = false
        ;

        var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);

        var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());

        processor.LinkTo(handler, new DataflowLinkOptions()  PropagateCompletion = true );

        foreach (int id in ids)
        
            processor.Post(id);
        

        processor.Complete();
        await handler.Completion;
    

其他选项可以使您的方法成为从TransformBlock 流出的可观察流,或使用IAsyncEnumerableyield return 和异步get 方法。

【讨论】:

对!所以我可以有效地将处理程序的委托作为参数添加到GetLotsOfStuff。由于调用者知道它想对每个项目做什么,它可以只传递委托。 是的,如果调用者知道如何处理每个项目,那可以工作

以上是关于通过 IEnumerable 和 TPL 数据流传输数据的主要内容,如果未能解决你的问题,请参考以下文章

将生成 IEnumerable<T> 的 TransformBlock 链接到接收 T 的块

通过 BufferBlock 的背压不起作用。 (C# TPL 数据流)

TPL 数据流与普通信号量

带有循环链接的 TPL 块?

TPL Dataflow 如何与“全局”数据同步

TPL 数据流 - 与事件流一起使用