如何在 .net Core API 项目中跨多个线程限制对 HttpClient 的所有传出异步调用
Posted
技术标签:
【中文标题】如何在 .net Core API 项目中跨多个线程限制对 HttpClient 的所有传出异步调用【英文标题】:How to Throttle all outgoing asynchronous calls to HttpClient across multiple threads in .net Core API project 【发布时间】:2018-08-27 17:22:27 【问题描述】:我正在设计一个使用我无法控制的外部 api 的 .net 核心 web api。我找到了一些关于堆栈溢出的优秀答案,这些答案允许我在使用 semaphoreslim 的同一线程中限制我对这个外部 API 的请求。我想知道如何最好地将这种限制扩展到应用程序范围,而不仅仅是限制特定的任务列表。我一直在学习 HttpMessageHandlers,这似乎是一种拦截所有传出消息并应用节流的可能方法。但我担心线程安全和我可能不理解的锁定问题。我包括我当前的节流代码,并希望这可能有助于理解我正在尝试做的事情,但跨越多个线程,并且不断添加任务而不是预定义的任务列表。
private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
var rtn = new List<PagedResultResponse>();
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: throttle);
foreach (var page in pages)
await throttler.WaitAsync();
allTasks.Add(
Task.Run(async () =>
try
var result = await GetPagedResult(client, url, page);
return result;
finally
throttler.Release();
));
await Task.WhenAll(allTasks);
foreach (var task in allTasks)
var result = ((Task<PagedResultResponse>)task).Result;
rtn.Add(result);
return rtn;
【问题讨论】:
您可以围绕您的 HttpClient 创建一个包装类并将其注册到单例范围内。然后在里面,你会使用像Queue<T>
这样的东西。查看docs.microsoft.com/en-us/dotnet/standard/…。此外,在后台服务的上下文中,这可能也很有趣:docs.microsoft.com/en-us/aspnet/core/fundamentals/host/…
将HttpClient
用作单例是有问题的,因为它does not honour DNS updates。 HttpClientFactory 值得使用,因为它为您管理 HttpClient
生命周期问题。
在开始Task
s 时在我的答案中添加了一个附加部分
【参考方案1】:
概念性问题
SemaphoreSlim
是线程安全的,因此在将其用作跨多个线程的并行节流阀时不存在线程安全或锁定问题。
HttpMessageHandler
s 确实是 outbound middleware mechanism to intercept calls placed through HttpClient
。因此,它们是使用 SemaphoreSlim
对 Http 调用应用并行限制的理想方式。
简单实现
所以ThrottlingDelegatingHandler
可能看起来像这样:
public class ThrottlingDelegatingHandler : DelegatingHandler
private SemaphoreSlim _throttler;
public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
_throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
if (request == null) throw new ArgumentNullException(nameof(request));
await _throttler.WaitAsync(cancellationToken);
try
return await base.SendAsync(request, cancellationToken);
finally
_throttler.Release();
创建和维护一个实例作为单例:
int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism));
将 DelegatingHandler
应用于您希望通过其并行限制调用的所有 HttpClient
实例:
HttpClient throttledClient = new HttpClient(throttle);
HttpClient
不需要是单例:只有 throttle
实例可以。
为简洁起见,我省略了 Dot Net Core DI 代码,但您可以在 .Net Core 的容器中注册单例 ThrottlingDelegatingHandler
实例,在使用点通过 DI 获取该单例,然后在 @987654348 中使用它@s 你构造如上图。
但是:
更好的实现:使用 HttpClientFactory (.NET Core 2.1+)
以上内容仍然引出了你将如何管理HttpClient
生命周期的问题:
HttpClient
s do not pick up DNS updates。您的应用将不知道 DNS 更新,除非您杀死并重新启动它(可能不受欢迎)。
另一方面,using (HttpClient client = )
是一种频繁创建和处理的模式,can cause socket exhaustion。
HttpClientFactory
的设计目标之一是管理HttpClient
实例及其委托处理程序的生命周期,以避免这些问题。
在 .NET Core 2.1 中,您可以使用 HttpClientFactory
将其全部连接到 ConfigureServices(IServiceCollection services)
的 Startup
类中,如下所示:
int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));
services.AddHttpClient("MyThrottledClient")
.AddHttpMessageHandler<ThrottlingDelegatingHandler>();
("MyThrottledClient" 这里是一个named-client approach 只是为了保持这个例子简短;typed clients 避免字符串命名。)
在使用点,通过 DI (reference) 获取 IHttpClientFactory
,然后调用
var client = _clientFactory.CreateClient("MyThrottledClient");
获取预先配置了单例ThrottlingDelegatingHandler
的HttpClient
实例。
通过这种方式获得的HttpClient
实例的所有调用都将被限制(通常在整个应用程序中)到最初配置的int maxParallelism
。
HttpClientFactory 神奇地处理了所有HttpClient
的生命周期问题。
更好的实现:使用 Polly 和 IHttpClientFactory 来获得所有这些“开箱即用”
Polly 是deeply integrated with IHttpClientFactory,Polly 还提供Bulkhead policy,其中works as a parallelism throttle by an identical SemaphoreSlim mechanism。
因此,作为手动滚动ThrottlingDelegatingHandler
的替代方法,您还可以使用开箱即用的 IHttpClientFactory 的 Polly Bulkhead 策略。在您的 Startup
课程中,只需:
int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);
services.AddHttpClient("MyThrottledClient")
.AddPolicyHandler(throttler);
如前所述,从 HttpClientFactory 获取预配置的 HttpClient
实例。和以前一样,通过这种“MyThrottledClient”HttpClient
实例的所有调用都将被并行限制到配置的maxParallelism
。
Polly Bulkhead 策略还提供了配置您希望同时允许多少个操作“排队”为主信号量中的执行槽的功能。所以,例如:
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);
当如上配置为HttpClient
时,将允许10 个并行http 调用,以及最多100 个对执行槽的“队列”的http 调用。这可以通过防止下游系统出现故障导致上游排队调用的资源过度膨胀,从而为高吞吐量系统提供额外的弹性。
要将 Polly 选项与 HttpClientFactory 一起使用,请拉入 Microsoft.Extensions.Http.Polly
和 Polly
nuget 包。
参考:Polly deep doco on Polly and IHttpClientFactory; Bulkhead policy.
关于任务的附录
问题使用Task.Run(...)
并提到:
使用外部 api 的 .net 核心 web api
和:
不断添加任务,而不是预先定义的任务列表。
如果您的 .net 核心 web api 仅使用 .net 核心 web api 处理的每个请求一次的外部 API,并且您采用本答案其余部分中讨论的方法,卸载下游外部http 调用带有Task.Run(...)
的新Task
将是不必要的,只会在额外的Task
实例和线程切换中产生开销。点网核心已经在线程池的多个线程上运行传入的请求。
【讨论】:
这是一个很棒的回应 - 谢谢!只有 14 票?耻辱 将 ThrottlingDelegatingHandler 分配给 HttpClient 只会导致一切失败,并在 VS 调试器中显示大量错误,如下所示:抛出异常:System.Net.Http.dll 中的“System.InvalidOperationException”抛出异常: mscorlib.dll 中的“System.InvalidOperationException” 很好的答案。看到@JeffreyLeCours 的评论,我立即投了赞成票。现在是 29 岁。 很好的答案,正是我所需要的。谢谢你 请注意,无论是“工匠”(手工制作)还是波莉,我都看到了受限制线程的TaskCanceledException
s 问题。这是因为HttpClient.SendAsync()
使用cancellationToken
到honor HttpClient.Timeout
并且cts 设置之前 DH 被执行。要重现 TaskCanceledException
s,请在 httpClient 管道中添加 DelayDelegatingHandler
以延迟超过 HttpClient.Timeout
值的时间跨度。以上是关于如何在 .net Core API 项目中跨多个线程限制对 HttpClient 的所有传出异步调用的主要内容,如果未能解决你的问题,请参考以下文章