分区:如何在每个分区后添加等待
Posted
技术标签:
【中文标题】分区:如何在每个分区后添加等待【英文标题】:Partition: How to add a wait after every partition 【发布时间】:2021-04-25 18:29:00 【问题描述】:我有一个 API 每分钟接受 20 个请求,之后,我需要等待 1 分钟才能查询它。我有一个项目列表(通常 1000 多个),我需要从 API 查询其详细信息,我的想法是我可以使用 Partitioner
将我的列表划分为 20 个项目/请求,但很快我意识到 Partitioner
不起作用像这样,我的第二个想法是在分区中添加一个delay
,但这也是一个坏主意,据我了解,它会在每个不需要的请求之后增加一个延迟,相反,我需要在每个Partition
之后延迟一个。以下是我的代码:
public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
[Optional] int delay)
var whenAll = await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate
var allResponses = new List<V>();
using (partition)
while (partition.MoveNext())
allResponses.Add(await body(partition.Current));
await Task.Delay(TimeSpan.FromSeconds(delay));
return allResponses;
, token));
return whenAll.SelectMany(x => x);
有谁知道我如何做到这一点?
【问题讨论】:
相关:How to execute tasks in parallel but not more than N tasks per T seconds? 请求的持续时间是否计入每分钟请求数策略?换句话说,您是否允许每分钟 开始 20 个请求(与它们的持续时间无关),或者您必须在之前的 20 个请求完成之后等待一分钟? @TheodorZoulias 不,它没有,重要的是;每分钟 20 个电话。 相关:Add delay to parallel API call (Polly) 附带说明,请注意您当前的ForEachAsync
实现(可能是this 文章中最后一个ForEachAsync
的修改版本),以非理想方式处理异常大大地。原因在this答案的cmets中有说明。
【参考方案1】:
这是一个RateLimiter
类,您可以使用它来限制异步操作的频率。它是RateLimiter
类的更简单实现,可在this 答案中找到。
/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
private readonly CancellationTokenSource _disposeCts;
private readonly CancellationToken _disposeToken;
private bool _disposed;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
_disposeCts = new CancellationTokenSource();
_disposeToken = _disposeCts.Token;
public async Task WaitAsync(CancellationToken cancellationToken = default)
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
private async void ScheduleSemaphoreRelease()
try await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false);
catch (OperationCanceledException) // Ignore
lock (_semaphore) if (!_disposed) _semaphore.Release();
/// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
public void Dispose()
lock (_semaphore)
if (_disposed) return;
_semaphore.Dispose();
_disposed = true;
_disposeCts.Cancel();
_disposeCts.Dispose();
使用示例:
List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
));
注意:我添加了一个Dispose
方法,这样可以取消RateLimiter
类内部发起的异步操作。这
使用完RateLimiter
后应该调用方法,否则挂起的异步操作将阻止
除了消耗与活动的Task.Delay
任务相关的资源之外,RateLimiter
还会被及时的垃圾收集。
原始的非常简单但有漏洞的实现可以在这个答案的2nd revision 中找到。
我正在添加一个更复杂的RateLimiter
类的替代实现,它基于Stopwatch
而不是SemaphoreSlim
。它的优点是不需要一次性的,因为它不会在后台启动隐藏的异步操作。缺点是WaitAsync
方法不支持CancellationToken
参数,并且由于复杂性,出现错误的概率更高。
public class RateLimiter
private readonly Stopwatch _stopwatch;
private readonly Queue<TimeSpan> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
// Arguments validation omitted
_stopwatch = Stopwatch.StartNew();
_queue = new Queue<TimeSpan>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnit = timeUnit;
public Task WaitAsync()
var delay = TimeSpan.Zero;
lock (_stopwatch)
var currentTimestamp = _stopwatch.Elapsed;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
_queue.Dequeue();
if (_queue.Count >= _maxActionsPerTimeUnit)
var refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delay = refTimestamp - currentTimestamp;
Debug.Assert(delay >= TimeSpan.Zero);
if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
_queue.Enqueue(currentTimestamp + delay + _timeUnit);
if (delay == TimeSpan.Zero) return Task.CompletedTask;
return Task.Delay(delay);
【讨论】:
这非常好用并且简化了我的工作,不需要分区。谢谢。以上是关于分区:如何在每个分区后添加等待的主要内容,如果未能解决你的问题,请参考以下文章