如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?
Posted
技术标签:
【中文标题】如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?【英文标题】:How to buffer and batch REST get queries with asp.net web API 2 (using TPL/Reactive)? 【发布时间】:2021-12-27 15:45:51 【问题描述】:我正在运行一个目前几乎无法扩展的面向公众的 api。此 API 建立在 Asp.Net Web API 2 (.Net 4.7.2) 之上。这个
我遇到的问题是,从这个入口点,服务必须多次调用其他内部服务来暴露它们的 Rest 接口。 我们做了一些优化:
使所有这些调用异步 (async/await)。 所有服务(面向公众的服务和内部服务)都是负载平衡的,我们已经部署了 4 台具有高内存 ram/cpu(64 gb,每台 8cpu)的服务器但是当我们突然爆发负载,或者当我们进行一些压力测试时,我们发现我们很难扩大规模:响应时间开始增加,我们无法达到超过 150 req/s 平均响应时间为 2.5 秒,而所有时间似乎都花在等待每个内部服务响应的网络延迟上...... 所以我在想是否可以缓冲一堆请求并批量调用内部 api 以获取详细信息以组合然后应答调用者。
我的想法是使用一种特殊类型的静态 httpClient,它带有一个异步方法,可以缓冲调用,并在有特殊缓冲的调用计数或何时发出请求 已经过了几毫秒的限制:这样,当我们处于负载状态时,我们的 API 可以进行很少的网络调用并且响应速度更快...... 我知道有些人也在使用妈妈/公共汽车,例如卡夫卡,但在我看来,这样做只会让我们有更多的并行调用来处理,但没有真正的收获 关于速度..(我可能错了)
为了说明我的想法:
您认为这可以使用 Reactive(观察所花费的延迟或缓冲的消息计数)/TPL 数据流(以填充块然后进行批处理调用)来完成吗? 我有这个想法,但我不知道这是否是个好主意,以及如何让它发挥作用......
更新: 在这里找到 Theodor Zoulias 提供的有用的示例代码:
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
static class Program
public static async Task Main()
var execution = new BatchExecution<int, int>(async (int[] inputs) =>
Print($"Processing [String.Join(", ", inputs)]");
await Task.Yield() ;
return inputs.Select(x => x * 10).ToArray();
, batchSize: 3);
Task[] workers = Enumerable.Range(1, 10).Select(id => Task.Run(async () =>
//await Task.Delay(id * 50);
Print($"Before InvokeAsync(id)");
var result = await execution.InvokeAsync(id);
Print($"After await InvokeAsync(id), result is result");
)).ToArray();
await Task.WhenAll(workers);
static void Print(string line)
Console.WriteLine($@"DateTime.Now:HH:mm:ss.fff [Thread.CurrentThread
.ManagedThreadId] > line");
public class BatchExecution<TInput, TOutput>
private class AsyncOperation : TaskCompletionSource<TOutput>
public AsyncOperation() :
base(TaskCreationOptions.RunContinuationsAsynchronously)
public TInput Input get; init;
private readonly BatchBlock<AsyncOperation> _batchBlock;
private readonly ActionBlock<AsyncOperation[]> _actionBlock;
public BatchExecution(
Func<TInput[], Task<TOutput[]>> batchAction,
int batchSize,
int maximumConcurrency = DataflowBlockOptions.Unbounded)
_batchBlock = new BatchBlock<AsyncOperation>(batchSize);
_actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
try
TInput[] inputs = operations.Select(x => x.Input).ToArray();
TOutput[] results = await batchAction(inputs);
if (results.Length != inputs.Length)
throw new InvalidOperationException("Results count mismatch.");
for (int i = 0; i < operations.Length; i++)
operations[i].SetResult(results[i]);
catch (OperationCanceledException oce)
Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
catch (Exception ex)
Array.ForEach(operations, x => x.TrySetException(ex));
, new() MaxDegreeOfParallelism = maximumConcurrency );
_batchBlock.LinkTo(_actionBlock, new() PropagateCompletion = true );
public Task<TOutput> InvokeAsync(TInput input)
var operation = new AsyncOperation() Input = input ;
bool accepted = _batchBlock.Post(operation);
if (!accepted) throw new InvalidOperationException(
"The component has been marked as complete.");
return operation.Task;
public void Complete() => _batchBlock.Complete();
public Task Completion => _actionBlock.Completion;
我需要一些关于我这样做的方式的反馈/建议:是否可以使用 Reactive/TPL 和 httpClient 做我所追求的事情,或者有更好的方法吗?
【问题讨论】:
我无法评论您正在尝试做的事情是否理智,但对于技术 TPL 数据流表现,您可以查看此处:Batching on duration or threshold using TPL Dataflow 这是我得到 IPropagatorBlock 想法的地方...。但我不知道如何等待让我们说 10 个 url 在发出请求之前排队并得到响应一并在异步工作中响应调用者... 您可以将冷的Task
s 作为消息传递给BatchBlock
,并将其链接到ActionBlock<Task>
,该ActionBlock<Task>
将通过调用RunSynchronously
方法来运行任务。你可以看到一个例子here。如果您要调用的操作是异步的,则需要嵌套任务 (Task<Task>
)。
其实上面的想法和提到的例子可能与你想要达到的目标相去甚远。如果您可以提供所需功能的玩具/最小示例,这可能会有所帮助,以便我们有更切实的评论。
感谢您的反馈:我正在尝试制作另一个更高级的玩具示例...是的,我希望调用者异步使用它,就像使用纯 httpclient 一样。在主要时间,我发现了一个与我想做的非常相似的例子:***.com/questions/50120304/…。与我不同的是,我确实想将所有要请求的 id 分组并以批处理模式进行一次调用,而上面的示例是为了限制对同一服务的调用...
【参考方案1】:
这是一个BatchExecution
类,它接受单个请求,并在存储的请求数达到指定数量(batchSize
)时调用批处理操作。批处理操作的结果会传播到相关的单个请求:
public class BatchExecution<TInput, TOutput>
private class AsyncOperation : TaskCompletionSource<TOutput>
public AsyncOperation() :
base(TaskCreationOptions.RunContinuationsAsynchronously)
public TInput Input get; init;
private readonly BatchBlock<AsyncOperation> _batchBlock;
private readonly ActionBlock<AsyncOperation[]> _actionBlock;
public BatchExecution(
Func<TInput[], Task<TOutput[]>> batchAction,
int batchSize,
int maximumConcurrency = DataflowBlockOptions.Unbounded)
_batchBlock = new BatchBlock<AsyncOperation>(batchSize);
_actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
try
TInput[] inputs = operations.Select(x => x.Input).ToArray();
TOutput[] results = await batchAction(inputs);
if (results.Length != inputs.Length)
throw new InvalidOperationException("Results count mismatch.");
for (int i = 0; i < operations.Length; i++)
operations[i].SetResult(results[i]);
catch (OperationCanceledException oce)
Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
catch (Exception ex)
Array.ForEach(operations, x => x.TrySetException(ex));
, new() MaxDegreeOfParallelism = maximumConcurrency );
_batchBlock.LinkTo(_actionBlock, new() PropagateCompletion = true );
public Task<TOutput> InvokeAsync(TInput input)
var operation = new AsyncOperation() Input = input ;
bool accepted = _batchBlock.Post(operation);
if (!accepted) throw new InvalidOperationException(
"The component has been marked as complete.");
return operation.Task;
public void Complete() => _batchBlock.Complete();
public Task Completion => _actionBlock.Completion;
使用示例。让我们假设存在这个内部服务 API:
Task<string[]> GetCityNamesAsync(int[] ids);
然后可以像这样初始化和使用BatchExecution
:
var batchExecution = new BatchExecution<int, string>(async (int[] ids) =>
return await GetCityNamesAsync(ids);
, batchSize: 10);
//...
string cityName = await batchExecution.InvokeAsync(13);
您可以考虑通过将标准 BatchBlock<AsyncOperation>
替换为自定义时间感知 BatchBlock
来自定义类,就像在 this 问题中找到的那样。
【讨论】:
非常感谢。但是您的版本仅在batchsize = 1时才有效,一旦我使用超过一个项目的bathsize进行测试,我就会陷入僵局:脚本在第一次调用时一直在等待(等待batchExecution.InvokeAsync(xx)。我有在控制台应用程序中尝试了示例,因此可能是一种死锁,因为批处理块在完成之前等待下一个元素,而主程序正在等待批处理块...我不确定,但仍在尝试使其工作. @Dypso 上面的实现是基于标准的BatchBlock<T>
,它不是时间感知的,所以一个独立的InvokeAsync
永远不会完成。这是因为永远不会创建包含 batchSize
操作的批处理。在BatchExecution
类的实现中,您需要使用多个worker(比batchSize
更多的worker)对其进行测试,或者您必须将标准BatchBlock<T>
替换为自定义时间感知的。
对不起,“工人”是什么意思?我已经进行了测试,我的 InvokeAsync 至少比 batchSize 多,但它不起作用并且似乎锁定了第一个 invokeAsync 调用。我还用时间意识替换了标准 BatchBlock以上是关于如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 ASP.NET Web API 5.2.3 中使用 FromForm 属性
如何使用 asp.net Web API 2(使用 TPL/Reactive)缓冲和批处理 REST 获取查询?
Asp.Net Web API 2第十一课——在Web API中使用Dependency Resolver
Web API系列教程2.1 — ASP.NET Web API中的路由机制
如何使用 fetch 将 FormData 从 javascript 发送到 ASP.NET Core 2.1 Web API