是否可以在 .NET 6.0 中限制 Parallel.ForEachAsync 以避免速率限制?
Posted
技术标签:
【中文标题】是否可以在 .NET 6.0 中限制 Parallel.ForEachAsync 以避免速率限制?【英文标题】:Is it possible to throttle Parallel.ForEachAsync in .NET 6.0 to avoid rate limiting? 【发布时间】:2022-01-14 09:18:50 【问题描述】:我对编程很陌生(
我的团队正在开发与第三方系统的集成,但第三方端点之一缺乏有意义的方式来获取与条件匹配的实体列表。
我们一直在通过循环请求集合来获取这些实体,并将每个等待调用的结果添加到列表中。这工作得很好,但是获取实体比从其他端点获取实体花费的时间要长得多,后者允许我们通过提供 id 列表来获取实体列表。
.NET 6.0 引入了Parallel.ForEachAsync(),它让我们可以并行异步执行多个等待任务。
例如:
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(List<IRestRequest> requests)
where TEntity : IEntity
var entities = new ConcurrentBag<TEntity>();
// Create a function that takes a RestRequest and returns the
// result of the request's execution, for each request
var requestExecutionTasks = requests.Select(i =>
new Func<Task<TEntity>>(() => GetAsync<TEntity>(i)));
// Execute each of the functions asynchronously in parallel,
// and add the results to the aggregate as they come in
await Parallel.ForEachAsync(requestExecutionTasks, new ParallelOptions
// This lets us limit the number of threads to use. -1 is unlimited
MaxDegreeOfParallelism = -1
, async (func, _) => entities.Add(await func()));
return entities.ToList();
使用此代码而不是简单的 foreach 循环,将在我的测试实例上获取约 30 个实体所需的时间平均缩短了 91%。棒极了。但是,我们担心在可能有数千个实体的客户端系统上使用它时可能会出现速率限制。我们有一个系统可以从他们的 API 中检测到“您的速率受限”消息,并在重试之前提示请求一秒钟左右,但这并不是一个好的解决方案,因为它是一种安全措施。
如果我们只是循环请求,我们可以通过在循环的每次迭代中执行await Task.Delay(minimumDelay)
之类的操作来限制调用。如果我错了,请纠正我,但据我了解,这在并行 foreach 执行请求时实际上不起作用,因为它会使所有请求在执行前等待相同的时间。有没有办法让每个单独的请求在执行前等待一定的时间,只有当我们接近速率限制时?如果可能的话,我希望在不限制要使用的线程数的情况下这样做。
【问题讨论】:
您是否可以在对 GetAsync 的调用中引入此延迟,即您传入一些参数以指示应该有延迟?你怎么知道你需要限制速率? 您可以在this 问题中找到RateLimiter
类。速率限制意味着您要限制在任何时间跨度内可以启动的操作数量。不考虑每个操作的持续时间。相反,如果您想对同时进行的操作数量施加限制,请使用MaxDegreeOfParallelism
选项。
@auburg 是的,这是可能的,但我更喜欢在创建要执行的任务时配置延迟。我知道第三方的 API 以前对我们的调用进行了速率限制。我不记得是哪个了,但他们的限制是每秒 60 或 100 次调用。
正如@TheodorZoulias 所暗示的那样,Parallel.ForEachAsync 对此有点太基本了。如果您添加延迟,这不会停止 foreach 将任务添加到线程池,它们都会有延迟(yuk)。您需要一个“真正的”速率限制器,当达到限制时停止添加新任务
附带说明,使用具有无限并行性的Parallel.ForEachAsync
API 与一次启动所有任务并使用await Task.WhenAll(tasks)
等待它们没有太大区别。可以说使用这种更简单的方法可能会更好,因为Task.WhenAll
API 自然地以原始顺序返回结果,并且它不需要像ConcurrentQueue<T>
或ConcurrentBag<T>
这样的并发集合。跨度>
【参考方案1】:
我的建议是放弃Parallel.ForEachAsync
方法,而使用新的Chunk
LINQ 运算符与Task.WhenAll
方法结合使用。您可以像这样每秒启动 100 个异步操作:
public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
List<IRestRequest> requests) where TEntity : IEntity
var tasks = new List<Task<TEntity>>();
foreach (var chunk in requests.Chunk(100))
tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
await Task.Delay(TimeSpan.FromSeconds(1.0));
return (await Task.WhenAll(tasks)).ToList();
假设启动异步操作(调用GetAsync
方法)所需的时间可以忽略不计。
这种方法有一个固有的缺点,即在发生异常的情况下,在所有操作完成之前不会传播失败。作为比较,Parallel.ForEachAsync
方法在检测到第一个故障后停止调用异步委托并尽快完成。
【讨论】:
以上是关于是否可以在 .NET 6.0 中限制 Parallel.ForEachAsync 以避免速率限制?的主要内容,如果未能解决你的问题,请参考以下文章