分区:如何在每个分区后添加等待

Posted

技术标签:

【中文标题】分区:如何在每个分区后添加等待【英文标题】:Partition: How to add a wait after every partition 【发布时间】:2021-04-25 18:29:00 【问题描述】:

我有一个 API 每分钟接受 20 个请求,之后,我需要等待 1 分钟才能查询它。我有一个项目列表(通常 1000 多个),我需要从 API 查询其详细信息,我的想法是我可以使用 Partitioner 将我的列表划分为 20 个项目/请求,但很快我意识到 Partitioner 不起作用像这样,我的第二个想法是在分区中添加一个delay,但这也是一个坏主意,据我了解,它会在每个不需要的请求之后增加一个延迟,相反,我需要在每个Partition 之后延迟一个。以下是我的代码:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)

    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate 
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                
            return allResponses;
        , token));
    return whenAll.SelectMany(x => x);

有谁知道我如何做到这一点?

【问题讨论】:

相关:How to execute tasks in parallel but not more than N tasks per T seconds? 请求的持续时间是否计入每分钟请求数策略?换句话说,您是否允许每分钟 开始 20 个请求(与它们的持续时间无关),或者您必须在之前的 20 个请求完成之后等待一分钟? @TheodorZoulias 不,它没有,重要的是;每分钟 20 个电话。 相关:Add delay to parallel API call (Polly) 附带说明,请注意您当前的ForEachAsync 实现(可能是this 文章中最后一个ForEachAsync 的修改版本),以非理想方式处理异常大大地。原因在this答案的cmets中有说明。 【参考方案1】:

这是一个RateLimiter 类,您可以使用它来限制异步操作​​的频率。它是RateLimiter 类的更简单实现,可在this 答案中找到。

/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable

    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;
    private readonly CancellationTokenSource _disposeCts;
    private readonly CancellationToken _disposeToken;
    private bool _disposed;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
        _disposeCts = new CancellationTokenSource();
        _disposeToken = _disposeCts.Token;
    

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    

    private async void ScheduleSemaphoreRelease()
    
        try  await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); 
        catch (OperationCanceledException)   // Ignore
        lock (_semaphore)  if (!_disposed) _semaphore.Release(); 
    

    /// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
    public void Dispose()
    
        lock (_semaphore)
        
            if (_disposed) return;
            _semaphore.Dispose();
            _disposed = true;
            _disposeCts.Cancel();
            _disposeCts.Dispose();
        
    

使用示例:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>

    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
));

注意:我添加了一个Dispose方法,这样可以取消RateLimiter类内部发起的异步操作。这 使用完RateLimiter 后应该调用方法,否则挂起的异步操作将阻止 除了消耗与活动的Task.Delay 任务相关的资源之外,RateLimiter 还会被及时的垃圾收集。 原始的非常简单但有漏洞的实现可以在这个答案的2nd revision 中找到。


我正在添加一个更复杂的RateLimiter 类的替代实现,它基于Stopwatch 而不是SemaphoreSlim。它的优点是不需要一次性的,因为它不会在后台启动隐藏的异步操作。缺点是WaitAsync 方法不支持CancellationToken 参数,并且由于复杂性,出现错误的概率更高。

public class RateLimiter

    private readonly Stopwatch _stopwatch;
    private readonly Queue<TimeSpan> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    
        // Arguments validation omitted
        _stopwatch = Stopwatch.StartNew();
        _queue = new Queue<TimeSpan>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnit = timeUnit;
    

    public Task WaitAsync()
    
        var delay = TimeSpan.Zero;
        lock (_stopwatch)
        
            var currentTimestamp = _stopwatch.Elapsed;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            
                _queue.Dequeue();
            
            if (_queue.Count >= _maxActionsPerTimeUnit)
            
                var refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delay = refTimestamp - currentTimestamp;
                Debug.Assert(delay >= TimeSpan.Zero);
                if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
            
            _queue.Enqueue(currentTimestamp + delay + _timeUnit);
        
        if (delay == TimeSpan.Zero) return Task.CompletedTask;
        return Task.Delay(delay);
    

【讨论】:

这非常好用并且简化了我的工作,不需要分区。谢谢。

以上是关于分区:如何在每个分区后添加等待的主要内容,如果未能解决你的问题,请参考以下文章

如何调整磁盘分区大小

如何调整磁盘大小

linux添加新硬盘怎么做 要如何分区

win7一体机分区不见的文件如何恢复

怎样向esp分区添加引导文件?

如何根据 SQL 中的分区对行求和?