如何限制数据库的异步IO任务数量?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何限制数据库的异步IO任务数量?相关的知识,希望对你有一定的参考价值。

我有一个列表 id's 而我想获得这些数据中的每一个。id 并行从数据库中获取。我的下面 ExecuteAsync 方法是在非常高的吞吐量下被调用的,对于每个请求,我们有大约的 500 ids 我需要提取数据。

所以我得到了下面的代码,在这个代码中,我围绕着列表中的 ids 并为每一个这样的对象进行异步调用 id 并行,而且工作正常。

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper) where T : class

    var tasks = new List<Task<T>>(ids.Count);
    // invoking multiple id in parallel to get data for each id from database
    for (int i = 0; i < ids.Count; i++)
    
        tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
    

    // wait for all id response to come back
    var responses = await Task.WhenAll(tasks);

    var excludeNull = new List<T>(ids.Count);
    for (int i = 0; i < responses.Length; i++)
    
        var response = responses[i];
        if (response != null)
        
            excludeNull.Add(response);
        
    
    return excludeNull;


private async Task<T> Execute<T>(IPollyPolicy policy,
    Func<CancellationToken, Task<T>> requestExecuter) where T : class

    var response = await policy.Policy.ExecuteAndCaptureAsync(
        ct => requestExecuter(ct), CancellationToken.None);
    if (response.Outcome == OutcomeType.Failure)
    
        if (response.FinalException != null)
        
            // log error
            throw response.FinalException;
        
    

    return response?.Result;

问题。

现在你可以看到我在循环所有的... ids 并为每一个人并行地对数据库进行一堆异步调用 id 这可能会给数据库带来很大的负载(取决于有多少请求到来)。所以我想限制我们对数据库的异步调用次数。我修改了 ExecuteAsync 使用 Semaphore 如下图所示,但它看起来不像我想做的那样。

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper) where T : class

    var throttler = new SemaphoreSlim(250);
    var tasks = new List<Task<T>>(ids.Count);
    // invoking multiple id in parallel to get data for each id from database
    for (int i = 0; i < ids.Count; i++)
    
        await throttler.WaitAsync().ConfigureAwait(false);
        try
        
            tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
        
        finally
        
            throttler.Release();
        
    

    // wait for all id response to come back
    var responses = await Task.WhenAll(tasks);

    // same excludeNull code check here

    return excludeNull;

Semaphore是否可以在 ThreadsTasks 读它 此处 看起来Semaphore是用于线程,SemaphoreSlim是用于任务。

这是正确的吗?如果是,那么有什么最好的方法来解决这个问题,并限制我们对数据库的异步IO任务的数量。

答案

Task是对线程的一种抽象,并不一定要创建一个新的线程。Semaphore限制了可以访问该for循环的线程数量。Execute返回一个不是线程的Task。如果只有1个请求,那么这个for循环里面就只有1个线程,即使它请求的是500个id。这1个线程自己会发送掉所有的异步IO任务。

算是吧。我想说的是,任务和线程完全没有关系。其实任务有两种:一种是委托任务(算是线程的抽象),一种是 承诺 任务(与线程无关)。

关于 SemaphoreSlim它确实限制了一个代码块的并发量(不是线程)。

我最近开始玩C#,所以我的理解是不正确的,看起来像w.r.t Threads和Tasks。

我建议阅读我的 async 介绍最佳做法. 后续行动 没有线 如果你对线程如何不真正参与感兴趣的话。

我修改了ExecuteAsync来使用Semaphore,如下图所示,但它看起来并没有达到我想要的效果。

目前的代码只是对添加任务到列表的过程进行节制,反正每次只做一个。你要做的是节制执行本身。

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class

  var throttler = new SemaphoreSlim(250);
  var tasks = new List<Task<T>>(ids.Count);

  // invoking multiple id in parallel to get data for each id from database
  for (int i = 0; i < ids.Count; i++)
    tasks.Add(ThrottledExecute(ids[i]));

  // wait for all id response to come back
  var responses = await Task.WhenAll(tasks);

  // same excludeNull code check here
  return excludeNull;

  async Task<T> ThrottledExecute(int id)
  
    await throttler.WaitAsync().ConfigureAwait(false);
    try 
      return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
     finally 
      throttler.Release();
    
  

另一答案

你的同事可能想到的是 Semaphore 类,它确实是一个以线程为中心的节制器,没有异步能力。

限制了线程可以并发访问一个资源或资源池的数量。

SemaphoreSlim 类是一个轻量级的替代 Semaphore,其中包括异步方法 WaitAsync这就是世界上所有的不同。这 WaitAsync 并不阻止一个线程,而是阻止一个异步工作流。异步工作流很便宜(通常每个工作流不到1000字节)。你可以在任何特定的时刻同时 "运行 "数以百万计的工作流。而线程则不是这样,由于 1 MB 的内存,每个线程为其栈保留的内存。

至于 ExecuteAsync 方法,下面是如何通过使用LINQ方法重构它的方法 Select, Where, ToArrayToList:


最新情况: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 波利 图书馆 支持 捕捉并继续当前的同步上下文,所以我添加了个 bool executeOnCurrentContext参数的API。我还把异步的 Execute 办法 ExecuteAsync媲美 准则.

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper,
    int concurrencyLevel = 1, bool executeOnCurrentContext = false) where T : class

    var throttler = new SemaphoreSlim(concurrencyLevel);
    Task<T>[] tasks = ids.Select(async id =>
    
        await throttler.WaitAsync().ConfigureAwait(executeOnCurrentContext);
        try
        
            return await ExecuteAsync(policy, ct => mapper(ct, id),
                executeOnCurrentContext).ConfigureAwait(false);
        
        finally
        
            throttler.Release();
        
    ).ToArray();

    T[] results = await Task.WhenAll(tasks).ConfigureAwait(false);

    return results.Where(r => r != null).ToList();


private async Task<T> ExecuteAsync<T>(IPollyPolicy policy,
    Func<CancellationToken, Task<T>> function,
    bool executeOnCurrentContext = false) where T : class

    var response = await policy.Policy.ExecuteAndCaptureAsync(
        ct => executeOnCurrentContext ? function(ct) : Task.Run(() => function(ct)),
        CancellationToken.None, continueOnCapturedContext: executeOnCurrentContext)
        .ConfigureAwait(executeOnCurrentContext);
    if (response.Outcome == OutcomeType.Failure)
    
        if (response.FinalException != null)
        
            ExceptionDispatchInfo.Throw(response.FinalException);
        
    
    return response?.Result;

另一答案

您正在节制向列表添加任务的速度。你不是在节制任务的执行速度。要做到这一点,你可能必须在命令行中实现信号函数调用。Execute 方法本身。

如果你不能修改 Execute,另一种方法是轮询完成任务,有点像这样。

for (int i = 0; i < ids.Count; i++)

    var pendingCount = tasks.Count( t => !t.IsCompleted );
    while (pendingCount >= 500) await Task.Yield();
    tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));

await Task.WhenAll( tasks );
另一答案

其实TPL是可以控制任务的执行和限制并发量的,你可以测试多少个并行任务适合你的用例。你可以测试多少个并行任务适合你的用例。不需要考虑线程,TPL会帮你管理好一切。

要使用有限并发,请看这个答案,功劳归@panagiotis-kanavos所有。

.Net TPL:有限并发级任务调度器与任务优先级?

示例代码是(即使使用不同的优先级,你也可以剥离)。

QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default,4);
TaskScheduler pri0 = qts.ActivateNewQueue(priority: 0);
TaskScheduler pri1 = qts.ActivateNewQueue(priority: 1);

Task.Factory.StartNew(()=> , 
                  CancellationToken.None, 
                  TaskCreationOptions.None, 
                  pri0);

只要把所有的任务都扔到队列中,然后用... Task.WhenAll 你可以等到一切都完成了。

以上是关于如何限制数据库的异步IO任务数量?的主要内容,如果未能解决你的问题,请参考以下文章

js实现"线程池"限制异步任务数量

如何限制并发异步 I/O 操作的数量?

如何按名称限制运行 Celery 任务的最大数量

如何限制运行的并行任务数量? [关闭]

Python 异步IO

Spark-sql读取hive分区表限制分区过滤条件及限制分区数量