由于异步问题,并行 HttpClient 请求超时?
Posted
技术标签:
【中文标题】由于异步问题,并行 HttpClient 请求超时?【英文标题】:Parallel HttpClient requests timing out due to async problem? 【发布时间】:2021-09-28 11:20:31 【问题描述】:我正在使用System.Threading.Tasks.Parallel.ForEach
并行同步运行一个方法。在方法结束时,它需要发出几十个 HTTP POST
请求,这些请求相互不依赖。由于我使用的是 .NET Framework 4.6.2,System.Net.Http.HttpClient
完全是异步的,所以我使用Nito.AsyncEx.AsyncContext
来避免死锁,格式如下:
public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
AsyncContext.Run(async () => await Task.WhenAll(enumerable.Select(async c =>
await getResultsFor(c).ConfigureAwait(false))));
getResultsFor(MyClass c)
方法然后创建一个HttpRequestMessage
并使用以下方式发送它:
await httpClient.SendAsync(request);
然后解析响应并在 MyClass 的实例上设置相关字段。
我的理解是同步线程会阻塞在AsyncContext.Run(...)
,而一些任务是由AsyncContext
拥有的单个AsyncContextThread
异步执行的。当它们都完成后,同步线程将解除阻塞。
这适用于几百个请求,但是当它在五分钟内扩展到几千个时,一些请求开始从服务器返回 HTTP 408 Request Timeout
错误。我的日志表明这些超时发生在峰值负载时,发送的请求最多,并且在收到许多其他请求很久之后才发生超时。
我认为问题在于任务是await
HttpClient
内的服务器握手,但它们没有按 FIFO 顺序继续,所以当它们继续时,握手已经过期。但是,除了使用System.Threading.SemaphoreSlim
强制一次只能执行一个任务await httpClient.SendAsync(...)
之外,我想不出任何方法来处理这个问题。
我的应用程序非常大,将其完全转换为异步是不可行的。
【问题讨论】:
不只是黑白。一次限制到 1 几乎会破坏您的并行方法。但是需要某种限制。您现在所做的基本上是 DoS 攻击。 该错误具体是 客户端 403 超时,而不是服务器端 503 错误。在高峰期,我在五分钟内发出几千个请求,这完全在服务器容量范围内。我有单独的错误处理代码供客户端处理服务器端问题。 是的,你可以 DoS 你自己的(本地)网络堆栈:) 解决方案是一样的:驯服你的马。也许在开始之前构建批次或给出一些抖动偏移时间......或者只是确保同时打开的请求少于 X 个。不过,理想情况下应该是什么 X 可能取决于客户端系统。 您可能想阅读以下内容:makolyte.com/… @Fildor “确保同时打开的请求少于 X 个” - 除非我误解了某些东西,否则我认为这不能解决我的问题(除非 X == 1) .假设有 2 个插槽;什么是阻止请求#2 到 #200 通过插槽 B 而请求 #1 位于插槽 A 中等待继续,所以当它恢复时它立即超时? 【参考方案1】:这不是在阻塞之前包装任务可以完成的事情。对于初学者,如果请求通过,您最终可能会攻击服务器。现在你正在攻击客户端。 .NET Framework 中每个域有 2 个并发请求的限制可以放宽,但如果您将其设置得太高,您最终可能会破坏服务器。
您可以通过在管道中使用 DataFlow 块以固定并行度执行请求然后解析它们来解决此问题。假设您有一个名为 MyPayload
的类,其属性中有很多 Items
:
ServicePointManager.DefaultConnectionLimit = 1000;
var options=new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 10
;
var downloader=new TransformBlock<string,MyPayload>(async url=>
var json=await _client.GetStringAsync(url);
var data=JsonConvert.DeserializeObject<MyPayload>(json);
return data;
,options);
var importer=new ActionBlock<MyPayload>(async data=>
var items=data.Items;
using(var connection=new SqlConnection(connectionString))
using(var bcp=new SqlBulkCopy(connection))
using(var reader=ObjectReader.Create(items))
bcp.DestinationTableName = destination;
connection.Open();
await bcp.WriteToServerAsync(reader);
);
downloader.LinkTo(importer,new DataflowLinkOptions
PropagateCompletion=true
);
我正在使用 FastMember's ObjectReader 将项目包装在 DbDataReader 中,该 DbDataReader 可用于将记录批量插入数据库。
一旦你有了这个管道,你就可以开始将 URL 发布到 head 块 downloader
:
foreach(var url in hugeList)
downloader.Post(url);
downloader.Complete();
所有 URL 发布后,您告诉 donwloader
完成并等待管道中的最后一个块完成:
await importer.Completion;
【讨论】:
根据问题,我已经尝试使用SemaphoreSlim
对请求并行性实施限制。我认为这个解决方案不会解决无序异步延续导致超时的问题 - 除非强制并行度为 1。
@SimonW 确实如此,在过去的 6-7 年里,我每天使用它下载 100K 机票记录。这不是强制并行。它创建了 10 个工作任务来处理发布到块的所有 URL。这与启动 100 个任务并阻止其中的 90 个任务完全不同。此外,在 .NET Framework 中,一次只允许 2 个并发请求。通过更改 DefaultConnectionLimit
最多可以发出 1000 个请求。 MaxDOP=10
虽然确保一次只处理 10 个 URL
@SimonW 还可以通过在 worker 方法中添加 await Task.Delay()
来进一步限制请求。为了避免分波发送请求,延迟可以随机化。
据我了解,这将尝试异步处理 10 个请求;发送#1,然后在等待返回时发送#2,以此类推。不能保证请求 #1 在其服务器握手返回后很快就会继续,而不是单个可用线程选择首先继续在请求 #2 到 #10 中遇到的各种等待 - 当请求 #1 最终继续时,它会计时出去。使用 Dataflow 解决此问题的唯一方法是一次只允许一个请求,这与使用信号量进行限制相同。我对此的理解不正确吗?
@SimonW 没有。它将创建 10 个工作任务,每个任务将处理一条输入消息。一次将有 10 个线程处理消息。为什么会有超时?即使有,它也只会影响当前任务并且可以使用try/catch
块来处理。其他工人不会受到影响。我一直在使用它来下载包含数千条记录的机票销售报告,对其进行解析,将票号转发到检索单个票记录的下一步。【参考方案2】:
首先,Nito.AsyncEx.AsyncContext
将在线程池线程上执行;如documentation 中所述,要避免以所述方式出现死锁,需要Nito.AsyncEx.AsyncContextThread
的实例。
有两种可能的原因:
.NET Framework 4.6.2 中System.Net.Http.HttpClient
中的一个错误
问题中概述的继续优先级问题,其中个别请求没有足够及时地继续,因此超时。
如this answer and its comments, from a similar question 所述,可能可以使用自定义TaskScheduler
处理优先级问题,但使用信号量限制并发请求的数量可能是最好的答案:
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
public class MyClass
private static readonly AsyncContextThread asyncContextThread
= new AsyncContextThread();
private static readonly HttpClient httpClient = new HttpClient();
private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(10);
public HttpRequestMessage Request get; set;
public HttpResponseMessage Response get; private set;
private async Task GetResponseAsync()
await semaphore.WaitAsync();
try
Response = await httpClient.SendAsync(Request);
finally
semaphore.Release();
public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
Task.WaitAll(enumerable.Select(c =>
asyncContextThread.Factory.Run(() =>
c.GetResponseAsync())).ToArray());
已按预期使用 AsyncContextThread
在非线程池线程上执行异步代码。 AsyncContext
自己不会这样做。
【讨论】:
你不需要async c => await c.GetResponseAsync()
。你可以只写AsyncContext.Run(Task.WhenAll(enumerable.Select(c =>c.GetResponseAsync())));
。虽然所有这些代码都不比Task.WaitAll(enumerable.Select(c =>c.GetResponseAsync())
好,但假设GetResponseAsync
内部使用ConfigureAwait(false)
链接的问题说use Dataflow
,只提到SemaphoreSlim
作为替代
链接的问题和答案专门关于优先考虑延续;答案 及其 cmets (如本答案所述)建议最好的解决方案是使用数据流或信号量进行节流。这个答案使用信号量,因为它要简单得多,而且这个问题不需要 Dataflow 的任何功能。
AsyncContext.Run
采用 Func<Task>
而不是 Task
。
AFAIK AsyncContext.Run
使用当前线程作为上下文,它不关心当前线程是否由 ThreadPool
拥有。以上是关于由于异步问题,并行 HttpClient 请求超时?的主要内容,如果未能解决你的问题,请参考以下文章
从 HttpClient SendAsync 请求获取响应时出现无法解释的超时和延迟