异步/等待和多处理

Posted

技术标签:

【中文标题】异步/等待和多处理【英文标题】:async/await and multiprocessing 【发布时间】:2015-02-27 11:12:57 【问题描述】:

我有包含 50 000 000 个唯一域名的 txt 文件,起初我试图只打开每个站点。我正在使用异步 HttpClient 并尝试了 3 种不同的方法来拆分任务:

1

IEnumerable<string> lines = File.ReadLines("file.txt");
try

    DataSet allData;
    var downloadData = new TransformBlock<string,byte[]>(
    async line =>
    
        HttpClientHandler httpClientHandler = new HttpClientHandler();
        HttpClient client = new HttpClient(httpClientHandler);
        try
        
            HttpResponseMessage responseMessage =
            await client.GetAsync(line).ConfigureAwait(false);
            return
            await responseMessage.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
        
        catch (Exception ex)
        
            //catch all to reduce code for testing
            return null;
        
        finally
        
            Interlocked.Increment(ref finishedUrls);
        
    ,
    new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = 500,
    );
    foreach (var line in lines)
    downloadData.Post(line);
    downloadData.Complete();
    await downloadData.Completion;

2

List<Task> allTasks = new List<Task>();
SemaphoreSlim throttler = new SemaphoreSlim(initialCount: DataflowBlockOptions.Unbounded);
foreach (var line in lines)


    await throttler.WaitAsync().ConfigureAwait(false);
    allTasks.Add(Task.Run(async () =>
    

            try
            
                HttpClientHandler httpClientHandler = new HttpClientHandler();
                HttpClient client = new HttpClient(httpClientHandler);
                try
                
                    HttpResponseMessage responseMessage = await client.GetAsync(line).ConfigureAwait(false);
                    var byteArray = await responseMessage.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
                
                catch (Exception ex)
                
                
                Interlocked.Increment(ref finishedUrls);
            
            catch (Exception ex)
            
            
        
        finally
        
            throttler.Release();
        
    ));

await Task.WhenAll(allTasks);

3

await lines.ForEachAsync(500,cancellationToken,async line =>

    HttpClientHandler httpClientHandler = new HttpClientHandler();
    HttpClient client = new HttpClient(httpClientHandler);
    try
    
        HttpResponseMessage responseMessage = await client.GetAsync(line).ConfigureAwait(false);
        var byteArray = await responseMessage.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
    
    catch (Exception ex)
    
    
    Interlocked.Increment(ref finishedUrls);

);

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, CancellationToken cancellationToken,
            Func<T, Task> body)
        
            return Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(dop).AsParallel()
                select Task.Run(async delegate
                
                    using (partition)
                        while (partition.MoveNext())
                            await body(partition.Current).ConfigureAwait(false);
                , cancellationToken));
        

我从 #3 解决方案中获得的最佳速度结果 - 在性能监视器中建立了大约 12 000 个 url/分钟和 10 000 个连接 - cpu 使用了 ~1%

但是,当我将 txt 文件拆分为 5 个部分 = 每个文件中有 10 000 000 个 url 并运行我的程序的 5 个实例时,汇总速度为 25 000 个 url / 分钟并建立了 30 000 个连接 - cpu 使用率为 3%。我正在玩将分区数从 500 增加到更多,但这并没有带来很大的变化。所以我的问题是 - 如何运行一个可以处理 25 000 个 URL / 分钟的程序实例?如何划分异步作业以获得尽可能高的速度?

对进程有任何 .NET 限制吗?

程序在64位windows server 2012上运行 500Mb 网络、64GB RAM、SSD 磁盘、E5-1620-v2 CPU

更新 1 不同“dop”和 4 个实例的速度结果同时: http://pastebin.com/ab3UQPAC

【问题讨论】:

这在很大程度上取决于它运行的硬件。您希望我们如何回答这个问题? 我写过硬件可以在 5 个实例中轻松处理摘要 25 000 urls/min 我知道你提到了分区,但是你把它增加到了 5 倍吗? 是的,我尝试了 2500 个分区 - 甚至 5000 个分区 - 速度并没有增加太多 - 仅达到 13000-14000 urls/min 与 500 相比,2500/5000 的 CPU 配置文件是什么 【参考方案1】:

删除外部任务可能会有所帮助?

有点离题(减去异常处理?)

List<Task> allTasks = new List<Task>();
foreach (var line in lines)

            HttpClientHandler httpClientHandler = new HttpClientHandler();
            HttpClient client = new HttpClient(httpClientHandler);
            try
            
            allTasks.Add(client.GetAsync(line).
            ContinueWith(t => t.Result.Content.ReadAsByteArrayAsync(), TaskContinuationOptions.OnlyOnRanToCompletion));
            
            catch
            
            

await Task.WhenAll(allTasks);

如果有一个外部任务正在等待响应,是否可以想象您正在消耗过多的 ThreadPool 资源?不确定调度程序将如何处理此问题的具体细节,但外部任务对我来说似乎是多余的。

【讨论】:

为什么停止了?你可能仍然需要限制它。例如,获得 15,000 个网址需要多长时间。此外,您可能希望在其中保留上下文切换选项(例如 ConfigureAwait(false)。基本上遵循选项 2,但删除 Task.Run?并改用 ContinueWith? 是不是因为缺少异常处理而崩溃?也许它之前跳过了 URLS。我添加了一个 try catch【参考方案2】:

尝试将System.Net.ServicePointManager.DefaultConnectionLimit 设置为一个非常高的数字,例如int.MaxValue

【讨论】:

@schglurps 之前发布了相同的答案 - 限制是每个域【参考方案3】:

我认为您的问题与此有关:Limit of outgoing connections for one process (.Net) 尝试将最大连接数增加到您同时运行的任务数(可能是内核数)。

【讨论】:

每个域的连接数是有限的 - 正如我所说的,我有“唯一”域列表 好的,没看懂,抱歉

以上是关于异步/等待和多处理的主要内容,如果未能解决你的问题,请参考以下文章

异步编程和多线程有啥区别?

java同步异步和多线程编程

线程也疯狂-----异步编程

http请求和多线程

ajax的同步 和 异步

EF 数据上下文 - 异步/等待和多线程