如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?

Posted

技术标签:

【中文标题】如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?【英文标题】:How to buffer and batch REST get queries with asp.net web API 2 (using TPL/Reactive)? 【发布时间】:2021-12-27 15:45:51 【问题描述】:

我正在运行一个目前几乎无法扩展的面向公众的 api。此 API 建立在 Asp.Net Web API 2 (.Net 4.7.2) 之上。这个

我遇到的问题是,从这个入口点,服务必须多次调用其他内部服务来暴露它们的 Rest 接口。 我们做了一些优化:

使所有这些调用异步 (async/await)。 所有服务(面向公众的服务和内部服务)都是负载平衡的,我们已经部署了 4 台具有高内存 ram/cpu(64 gb,每台 8cpu)的服务器

但是当我们突然爆发负载,或者当我们进行一些压力测试时,我们发现我们很难扩大规模:响应时间开始增加,我们无法达到超过 150 req/s 平均响应时间为 2.5 秒,而所有时间似乎都花在等待每个内部服务响应的网络延迟上...... 所以我在想是否可以缓冲一堆请求并批量调用内部 api 以获取详细信息以组合然后应答调用者。

我的想法是使用一种特殊类型的静态 httpClient,它带有一个异步方法,可以缓冲调用,并在有特殊缓冲的调用计数或何时发出请求 已经过了几毫秒的限制:这样,当我们处于负载状态时,我们的 API 可以进行很少的网络调用并且响应速度更快...... 我知道有些人也在使用妈妈/公共汽车,例如卡夫卡,但在我看来,这样做只会让我们有更多的并行调用来处理,但没有真正的收获 关于速度..(我可能错了)

为了说明我的想法:

您认为这可以使用 Reactive(观察所花费的延迟或缓冲的消息计数)/TPL 数据流(以填充块然后进行批处理调用)来完成吗? 我有这个想法,但我不知道这是否是个好主意,以及如何让它发挥作用......

更新: 在这里找到 Theodor Zoulias 提供的有用的示例代码:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program

    public static async Task Main()
    
        var execution = new BatchExecution<int, int>(async (int[] inputs) =>
        
            Print($"Processing [String.Join(", ", inputs)]");
            await Task.Yield() ;
            return inputs.Select(x => x * 10).ToArray();
        , batchSize: 3);

        Task[] workers = Enumerable.Range(1, 10).Select(id => Task.Run(async () =>
        
            //await Task.Delay(id * 50);
            Print($"Before InvokeAsync(id)");
            var result = await execution.InvokeAsync(id);
            Print($"After await InvokeAsync(id), result is result");
        )).ToArray();

        await Task.WhenAll(workers);
    

    static void Print(string line)
    
        Console.WriteLine($@"DateTime.Now:HH:mm:ss.fff [Thread.CurrentThread
            .ManagedThreadId] > line");
    

    public class BatchExecution<TInput, TOutput>
    
        private class AsyncOperation : TaskCompletionSource<TOutput>
        
            public AsyncOperation() :
                base(TaskCreationOptions.RunContinuationsAsynchronously)
             
            public TInput Input  get; init; 
        

        private readonly BatchBlock<AsyncOperation> _batchBlock;
        private readonly ActionBlock<AsyncOperation[]> _actionBlock;

        public BatchExecution(
            Func<TInput[], Task<TOutput[]>> batchAction,
            int batchSize,
            int maximumConcurrency = DataflowBlockOptions.Unbounded)
        
            _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
            _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
            
                try
                
                    TInput[] inputs = operations.Select(x => x.Input).ToArray();
                    TOutput[] results = await batchAction(inputs);
                    if (results.Length != inputs.Length)
                        throw new InvalidOperationException("Results count mismatch.");
                    for (int i = 0; i < operations.Length; i++)
                        operations[i].SetResult(results[i]);
                
                catch (OperationCanceledException oce)
                
                    Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
                
                catch (Exception ex)
                
                    Array.ForEach(operations, x => x.TrySetException(ex));
                
            , new()  MaxDegreeOfParallelism = maximumConcurrency );
            _batchBlock.LinkTo(_actionBlock, new()  PropagateCompletion = true );
        

        public Task<TOutput> InvokeAsync(TInput input)
        
            var operation = new AsyncOperation()  Input = input ;
            bool accepted = _batchBlock.Post(operation);
            if (!accepted) throw new InvalidOperationException(
                "The component has been marked as complete.");
            return operation.Task;
        

        public void Complete() => _batchBlock.Complete();
        public Task Completion => _actionBlock.Completion;
    

我需要一些关于我这样做的方式的反馈/建议:是否可以使用 Reactive/TPL 和 httpClient 做我所追求的事情,或者有更好的方法吗?

【问题讨论】:

我无法评论您正在尝试做的事情是否理智,但对于技术 TPL 数据流表现,您可以查看此处:Batching on duration or threshold using TPL Dataflow 这是我得到 IPropagatorBlock 想法的地方...。但我不知道如何等待让我们说 10 个 url 在发出请求之前排队并得到响应一并在异步工作中响应调用者... 您可以将冷的Tasks 作为消息传递给BatchBlock,并将其链接到ActionBlock&lt;Task&gt;,该ActionBlock&lt;Task&gt; 将通过调用RunSynchronously 方法来运行任务。你可以看到一个例子here。如果您要调用的操作是异步的,则需要嵌套任务 (Task&lt;Task&gt;)。 其实上面的想法和提到的例子可能与你想要达到的目标相去甚远。如果您可以提供所需功能的玩具/最小示例,这可能会有所帮助,以便我们有更切实的评论。 感谢您的反馈:我正在尝试制作另一个更高级的玩具示例...是的,我希望调用者异步使用它,就像使用纯 httpclient 一样。在主要时间,我发现了一个与我想做的非常相似的例子:***.com/questions/50120304/…。与我不同的是,我确实想将所有要请求的 id 分组并以批处理模式进行一次调用,而上面的示例是为了限制对同一服务的调用... 【参考方案1】:

这是一个BatchExecution 类,它接受单个请求,并在存储的请求数达到指定数量(batchSize)时调用批处理操作。批处理操作的结果会传播到相关的单个请求:

public class BatchExecution<TInput, TOutput>

    private class AsyncOperation : TaskCompletionSource<TOutput>
    
        public AsyncOperation() :
            base(TaskCreationOptions.RunContinuationsAsynchronously)  
        public TInput Input  get; init; 
    

    private readonly BatchBlock<AsyncOperation> _batchBlock;
    private readonly ActionBlock<AsyncOperation[]> _actionBlock;

    public BatchExecution(
        Func<TInput[], Task<TOutput[]>> batchAction,
        int batchSize,
        int maximumConcurrency = DataflowBlockOptions.Unbounded)
    
        _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
        _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
        
            try
            
                TInput[] inputs = operations.Select(x => x.Input).ToArray();
                TOutput[] results = await batchAction(inputs);
                if (results.Length != inputs.Length)
                    throw new InvalidOperationException("Results count mismatch.");
                for (int i = 0; i < operations.Length; i++)
                    operations[i].SetResult(results[i]);
            
            catch (OperationCanceledException oce)
            
                Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
            
            catch (Exception ex)
            
                Array.ForEach(operations, x => x.TrySetException(ex));
            
        , new()  MaxDegreeOfParallelism = maximumConcurrency );
        _batchBlock.LinkTo(_actionBlock, new()  PropagateCompletion = true );
    

    public Task<TOutput> InvokeAsync(TInput input)
    
        var operation = new AsyncOperation()  Input = input ;
        bool accepted = _batchBlock.Post(operation);
        if (!accepted) throw new InvalidOperationException(
            "The component has been marked as complete.");
        return operation.Task;
    

    public void Complete() => _batchBlock.Complete();
    public Task Completion => _actionBlock.Completion;

使用示例。让我们假设存在这个内部服务 API:

Task<string[]> GetCityNamesAsync(int[] ids);

然后可以像这样初始化和使用BatchExecution

var batchExecution = new BatchExecution<int, string>(async (int[] ids) =>

    return await GetCityNamesAsync(ids);
, batchSize: 10);

//...
string cityName = await batchExecution.InvokeAsync(13);

您可以考虑通过将标准 BatchBlock&lt;AsyncOperation&gt; 替换为自定义时间感知 BatchBlock 来自定义类,就像在 this 问题中找到的那样。

【讨论】:

非常感谢。但是您的版本仅在batchsize = 1时才有效,一旦我使用超过一个项目的bathsize进行测试,我就会陷入僵局:脚本在第一次调用时一直在等待(等待batchExecution.InvokeAsync(xx)。我有在控制台应用程序中尝试了示例,因此可能是一种死锁,因为批处理块在完成之前等待下一个元素,而主程序正在等待批处理块...我不确定,但仍在尝试使其工作. @Dypso 上面的实现是基于标准的BatchBlock&lt;T&gt;,它不是时间感知的,所以一个独立的InvokeAsync 永远不会完成。这是因为永远不会创建包含 batchSize 操作的批处理。在BatchExecution 类的实现中,您需要使用多个worker(比batchSize 更多的worker)对其进行测试,或者您必须将标准BatchBlock&lt;T&gt; 替换为自定义时间感知的。 对不起,“工人”是什么意思?我已经进行了测试,我的 InvokeAsync 至少比 batchSize 多,但它不起作用并且似乎锁定了第一个 invokeAsync 调用。我还用时间意识替换了标准 BatchBlock :在这种情况下,每次计时器触发它时我至少有一次执行......我将使用最新的源代码更新我的问题中的代码,以便您可以看到我做了什么。 @Dypso 查看this 小提琴演示。我所说的工人是指彼此同时运行的异步方法。有时它们也被称为“异步工作流”。

以上是关于如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 ASP.NET Web API 5.2.3 中使用 FromForm 属性

如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?

Asp.Net Web API 2第十一课——在Web API中使用Dependency Resolver

Web API系列教程2.1 — ASP.NET Web API中的路由机制

如何使用 fetch 将 FormData 从 javascript 发送到 ASP.NET Core 2.1 Web API

如何在使用 JWT 的 asp.net 核心 web 应用和 web api 中使用谷歌身份验证