由于异步问题,并行 HttpClient 请求超时?

Posted

技术标签:

【中文标题】由于异步问题,并行 HttpClient 请求超时?【英文标题】:Parallel HttpClient requests timing out due to async problem? 【发布时间】:2021-09-28 11:20:31 【问题描述】:

我正在使用System.Threading.Tasks.Parallel.ForEach 并行同步运行一个方法。在方法结束时,它需要发出几十个 HTTP POST 请求,这些请求相互不依赖。由于我使用的是 .NET Framework 4.6.2,System.Net.Http.HttpClient 完全是异步的,所以我使用Nito.AsyncEx.AsyncContext 来避免死锁,格式如下:

public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)

    AsyncContext.Run(async () => await Task.WhenAll(enumerable.Select(async c => 
        await getResultsFor(c).ConfigureAwait(false))));

getResultsFor(MyClass c) 方法然后创建一个HttpRequestMessage 并使用以下方式发送它:

await httpClient.SendAsync(request);

然后解析响应并在 MyClass 的实例上设置相关字段。

我的理解是同步线程会阻塞在AsyncContext.Run(...),而一些任务是由AsyncContext拥有的单个AsyncContextThread异步执行的。当它们都完成后,同步线程将解除阻塞。

这适用于几百个请求,但是当它在五分钟内扩展到几千个时,一些请求开始从服务器返回 HTTP 408 Request Timeout 错误。我的日志表明这些超时发生在峰值负载时,发送的请求最多,并且在收到许多其他请求很久之后才发生超时。

我认为问题在于任务是awaitHttpClient 内的服务器握手,但它们没有按 FIFO 顺序继续,所以当它们继续时,握手已经过期。但是,除了使用System.Threading.SemaphoreSlim 强制一次只能执行一个任务await httpClient.SendAsync(...) 之外,我想不出任何方法来处理这个问题。

我的应用程序非常大,将其完全转换为异步是不可行的。

【问题讨论】:

不只是黑白。一次限制到 1 几乎会破坏您的并行方法。但是需要某种限制。您现在所做的基本上是 DoS 攻击。 该错误具体是 客户端 403 超时,而不是服务器端 503 错误。在高峰期,我在五分钟内发出几千个请求,这完全在服务器容量范围内。我有单独的错误处理代码供客户端处理服务器端问题。 是的,你可以 DoS 你自己的(本地)网络堆栈:) 解决方案是一样的:驯服你的马。也许在开始之前构建批次或给出一些抖动偏移时间......或者只是确保同时打开的请求少于 X 个。不过,理想情况下应该是什么 X 可能取决于客户端系统。 您可能想阅读以下内容:makolyte.com/… @Fildor “确保同时打开的请求少于 X 个” - 除非我误解了某些东西,否则我认为这不能解决我的问题(除非 X == 1) .假设有 2 个插槽;什么是阻止请求#2 到 #200 通过插槽 B 而请求 #1 位于插槽 A 中等待继续,所以当它恢复时它立即超时? 【参考方案1】:

这不是在阻塞之前包装任务可以完成的事情。对于初学者,如果请求通过,您最终可能会攻击服务器。现在你正在攻击客户端。 .NET Framework 中每个域有 2 个并发请求的限制可以放宽,但如果您将其设置得太高,您最终可能会破坏服务器。

您可以通过在管道中使用 DataFlow 块以固定并行度执行请求然后解析它们来解决此问题。假设您有一个名为 MyPayload 的类,其属性中有很多 Items

ServicePointManager.DefaultConnectionLimit = 1000;

var options=new ExecutionDataflowBlockOptions

    MaxDegreeOfParallelism = 10
;

var downloader=new TransformBlock<string,MyPayload>(async url=>
    var json=await _client.GetStringAsync(url);
    var data=JsonConvert.DeserializeObject<MyPayload>(json);
    return data;
,options);

var importer=new ActionBlock<MyPayload>(async data=>

    var items=data.Items;
    
    using(var connection=new SqlConnection(connectionString))
    using(var bcp=new SqlBulkCopy(connection))
    using(var reader=ObjectReader.Create(items))
    
        bcp.DestinationTableName = destination;
        connection.Open();

        await bcp.WriteToServerAsync(reader);
    
);


downloader.LinkTo(importer,new DataflowLinkOptions  
    PropagateCompletion=true
);

我正在使用 FastMember's ObjectReader 将项目包装在 DbDataReader 中,该 DbDataReader 可用于将记录批量插入数据库。

一旦你有了这个管道,你就可以开始将 URL 发布到 head 块 downloader

foreach(var url in hugeList)

    downloader.Post(url);

downloader.Complete();

所有 URL 发布后,您告诉 donwloader 完成并等待管道中的最后一个块完成:

await importer.Completion;

【讨论】:

根据问题,我已经尝试使用SemaphoreSlim 对请求并行性实施限制。我认为这个解决方案不会解决无序异步延续导致超时的问题 - 除非强制并行度为 1。 @SimonW 确实如此,在过去的 6-7 年里,我每天使用它下载 100K 机票记录。这不是强制并行。它创建了 10 个工作任务来处理发布到块的所有 URL。这与启动 100 个任务并阻止其中的 90 个任务完全不同。此外,在 .NET Framework 中,一次只允许 2 个并发请求。通过更改 DefaultConnectionLimit 最多可以发出 1000 个请求。 MaxDOP=10 虽然确保一次只处理 10 个 URL @SimonW 还可以通过在 worker 方法中添加 await Task.Delay() 来进一步限制请求。为了避免分波发送请求,延迟可以随机化。 据我了解,这将尝试异步处理 10 个请求;发送#1,然后在等待返回时发送#2,以此类推。不能保证请求 #1 在其服务器握手返回后很快就会继续,而不是单个可用线程选择首先继续在请求 #2 到 #10 中遇到的各种等待 - 当请求 #1 最终继续时,它会计时出去。使用 Dataflow 解决此问题的唯一方法是一次只允许一个请求,这与使用信号量进行限制相同。我对此的理解不正确吗? @SimonW 没有。它将创建 10 个工作任务,每个任务将处理一条输入消息。一次将有 10 个线程处理消息。为什么会有超时?即使有,它也只会影响当前任务并且可以使用try/catch 块来处理。其他工人不会受到影响。我一直在使用它来下载包含数千条记录的机票销售报告,对其进行解析,将票号转发到检索单个票记录的下一步。【参考方案2】:

首先,Nito.AsyncEx.AsyncContext 将在线程池线程上执行;如documentation 中所述,要避免以所述方式出现死锁,需要Nito.AsyncEx.AsyncContextThread 的实例。

有两种可能的原因:

.NET Framework 4.6.2 中 System.Net.Http.HttpClient 中的一个错误 问题中概述的继续优先级问题,其中个别请求没有足够及时地继续,因此超时。

如this answer and its comments, from a similar question 所述,可能可以使用自定义TaskScheduler 处理优先级问题,但使用信号量限制并发请求的数量可能是最好的答案:

using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;

public class MyClass 

    private static readonly AsyncContextThread asyncContextThread
        = new AsyncContextThread();
    private static readonly HttpClient httpClient = new HttpClient();
    private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(10);

    public HttpRequestMessage Request  get; set; 
    public HttpResponseMessage Response  get; private set; 
        
    private async Task GetResponseAsync()
    
        await semaphore.WaitAsync();
        try
        
            Response = await httpClient.SendAsync(Request);
        
        finally
        
            semaphore.Release();
        
    

    public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
    
        Task.WaitAll(enumerable.Select(c =>
            asyncContextThread.Factory.Run(() =>
                c.GetResponseAsync())).ToArray());
    


已按预期使用 AsyncContextThread 在非线程池线程上执行异步代码。 AsyncContext 自己不会这样做。

【讨论】:

你不需要async c =&gt; await c.GetResponseAsync()。你可以只写AsyncContext.Run(Task.WhenAll(enumerable.Select(c =&gt;c.GetResponseAsync())));。虽然所有这些代码都不比Task.WaitAll(enumerable.Select(c =&gt;c.GetResponseAsync())好,但假设GetResponseAsync内部使用ConfigureAwait(false) 链接的问题说use Dataflow,只提到SemaphoreSlim作为替代 链接的问题和答案专门关于优先考虑延续;答案 及其 cmets (如本答案所述)建议最好的解决方案是使用数据流或信号量进行节流。这个答案使用信号量,因为它要简单得多,而且这个问题不需要 Dataflow 的任何功能。 AsyncContext.Run 采用 Func&lt;Task&gt; 而不是 Task AFAIK AsyncContext.Run 使用当前线程作为上下文,它不关心当前线程是否由 ThreadPool 拥有。

以上是关于由于异步问题,并行 HttpClient 请求超时?的主要内容,如果未能解决你的问题,请参考以下文章

从 HttpClient SendAsync 请求获取响应时出现无法解释的超时和延迟

2018-08-22 异步httpclient(httpasyncclient)的使用与总结

TPL异步并行编程之任务超时

同步并发 HttpClient 使用

多个 HTTP 请求触发 HTTP Client 超时

区分用户取消超时