如何限制数据库的异步IO任务数量?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何限制数据库的异步IO任务数量?相关的知识,希望对你有一定的参考价值。
我有一个列表 id's
而我想获得这些数据中的每一个。id
并行从数据库中获取。我的下面 ExecuteAsync
方法是在非常高的吞吐量下被调用的,对于每个请求,我们有大约的 500 ids
我需要提取数据。
所以我得到了下面的代码,在这个代码中,我围绕着列表中的 ids
并为每一个这样的对象进行异步调用 id
并行,而且工作正常。
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
var excludeNull = new List<T>(ids.Count);
for (int i = 0; i < responses.Length; i++)
var response = responses[i];
if (response != null)
excludeNull.Add(response);
return excludeNull;
private async Task<T> Execute<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> requestExecuter) where T : class
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => requestExecuter(ct), CancellationToken.None);
if (response.Outcome == OutcomeType.Failure)
if (response.FinalException != null)
// log error
throw response.FinalException;
return response?.Result;
问题。
现在你可以看到我在循环所有的... ids
并为每一个人并行地对数据库进行一堆异步调用 id
这可能会给数据库带来很大的负载(取决于有多少请求到来)。所以我想限制我们对数据库的异步调用次数。我修改了 ExecuteAsync
使用 Semaphore
如下图所示,但它看起来不像我想做的那样。
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
await throttler.WaitAsync().ConfigureAwait(false);
try
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
finally
throttler.Release();
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
Semaphore是否可以在 Threads
或 Tasks
读它 此处 看起来Semaphore是用于线程,SemaphoreSlim是用于任务。
这是正确的吗?如果是,那么有什么最好的方法来解决这个问题,并限制我们对数据库的异步IO任务的数量。
Task是对线程的一种抽象,并不一定要创建一个新的线程。Semaphore限制了可以访问该for循环的线程数量。Execute返回一个不是线程的Task。如果只有1个请求,那么这个for循环里面就只有1个线程,即使它请求的是500个id。这1个线程自己会发送掉所有的异步IO任务。
算是吧。我想说的是,任务和线程完全没有关系。其实任务有两种:一种是委托任务(算是线程的抽象),一种是 承诺 任务(与线程无关)。
关于 SemaphoreSlim
它确实限制了一个代码块的并发量(不是线程)。
我最近开始玩C#,所以我的理解是不正确的,看起来像w.r.t Threads和Tasks。
我建议阅读我的 async
介绍 和 最佳做法. 后续行动 没有线 如果你对线程如何不真正参与感兴趣的话。
我修改了ExecuteAsync来使用Semaphore,如下图所示,但它看起来并没有达到我想要的效果。
目前的代码只是对添加任务到列表的过程进行节制,反正每次只做一个。你要做的是节制执行本身。
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
tasks.Add(ThrottledExecute(ids[i]));
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
async Task<T> ThrottledExecute(int id)
await throttler.WaitAsync().ConfigureAwait(false);
try
return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
finally
throttler.Release();
你的同事可能想到的是 Semaphore
类,它确实是一个以线程为中心的节制器,没有异步能力。
限制了线程可以并发访问一个资源或资源池的数量。
该 SemaphoreSlim
类是一个轻量级的替代 Semaphore
,其中包括异步方法 WaitAsync
这就是世界上所有的不同。这 WaitAsync
并不阻止一个线程,而是阻止一个异步工作流。异步工作流很便宜(通常每个工作流不到1000字节)。你可以在任何特定的时刻同时 "运行 "数以百万计的工作流。而线程则不是这样,由于 1 MB 的内存,每个线程为其栈保留的内存。
至于 ExecuteAsync
方法,下面是如何通过使用LINQ方法重构它的方法 Select
, Where
, ToArray
和 ToList
:
最新情况: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 负责人: 波利 图书馆 支持 捕捉并继续当前的同步上下文,所以我添加了个 bool executeOnCurrentContext
参数的API。我还把异步的 Execute
办法 ExecuteAsync
媲美 准则.
private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper,
int concurrencyLevel = 1, bool executeOnCurrentContext = false) where T : class
var throttler = new SemaphoreSlim(concurrencyLevel);
Task<T>[] tasks = ids.Select(async id =>
await throttler.WaitAsync().ConfigureAwait(executeOnCurrentContext);
try
return await ExecuteAsync(policy, ct => mapper(ct, id),
executeOnCurrentContext).ConfigureAwait(false);
finally
throttler.Release();
).ToArray();
T[] results = await Task.WhenAll(tasks).ConfigureAwait(false);
return results.Where(r => r != null).ToList();
private async Task<T> ExecuteAsync<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> function,
bool executeOnCurrentContext = false) where T : class
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => executeOnCurrentContext ? function(ct) : Task.Run(() => function(ct)),
CancellationToken.None, continueOnCapturedContext: executeOnCurrentContext)
.ConfigureAwait(executeOnCurrentContext);
if (response.Outcome == OutcomeType.Failure)
if (response.FinalException != null)
ExceptionDispatchInfo.Throw(response.FinalException);
return response?.Result;
您正在节制向列表添加任务的速度。你不是在节制任务的执行速度。要做到这一点,你可能必须在命令行中实现信号函数调用。Execute
方法本身。
如果你不能修改 Execute
,另一种方法是轮询完成任务,有点像这样。
for (int i = 0; i < ids.Count; i++)
var pendingCount = tasks.Count( t => !t.IsCompleted );
while (pendingCount >= 500) await Task.Yield();
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
await Task.WhenAll( tasks );
其实TPL是可以控制任务的执行和限制并发量的,你可以测试多少个并行任务适合你的用例。你可以测试多少个并行任务适合你的用例。不需要考虑线程,TPL会帮你管理好一切。
要使用有限并发,请看这个答案,功劳归@panagiotis-kanavos所有。
示例代码是(即使使用不同的优先级,你也可以剥离)。
QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default,4);
TaskScheduler pri0 = qts.ActivateNewQueue(priority: 0);
TaskScheduler pri1 = qts.ActivateNewQueue(priority: 1);
Task.Factory.StartNew(()=> ,
CancellationToken.None,
TaskCreationOptions.None,
pri0);
只要把所有的任务都扔到队列中,然后用... Task.WhenAll
你可以等到一切都完成了。
以上是关于如何限制数据库的异步IO任务数量?的主要内容,如果未能解决你的问题,请参考以下文章