异步/等待和多处理
Posted
技术标签:
【中文标题】异步/等待和多处理【英文标题】:async/await and multiprocessing 【发布时间】:2015-02-27 11:12:57 【问题描述】:我有包含 50 000 000 个唯一域名的 txt 文件,起初我试图只打开每个站点。我正在使用异步 HttpClient 并尝试了 3 种不同的方法来拆分任务:
1
IEnumerable<string> lines = File.ReadLines("file.txt");
try
DataSet allData;
var downloadData = new TransformBlock<string,byte[]>(
async line =>
HttpClientHandler httpClientHandler = new HttpClientHandler();
HttpClient client = new HttpClient(httpClientHandler);
try
HttpResponseMessage responseMessage =
await client.GetAsync(line).ConfigureAwait(false);
return
await responseMessage.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
catch (Exception ex)
//catch all to reduce code for testing
return null;
finally
Interlocked.Increment(ref finishedUrls);
,
new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 500,
);
foreach (var line in lines)
downloadData.Post(line);
downloadData.Complete();
await downloadData.Completion;
2
List<Task> allTasks = new List<Task>();
SemaphoreSlim throttler = new SemaphoreSlim(initialCount: DataflowBlockOptions.Unbounded);
foreach (var line in lines)
await throttler.WaitAsync().ConfigureAwait(false);
allTasks.Add(Task.Run(async () =>
try
HttpClientHandler httpClientHandler = new HttpClientHandler();
HttpClient client = new HttpClient(httpClientHandler);
try
HttpResponseMessage responseMessage = await client.GetAsync(line).ConfigureAwait(false);
var byteArray = await responseMessage.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
catch (Exception ex)
Interlocked.Increment(ref finishedUrls);
catch (Exception ex)
finally
throttler.Release();
));
await Task.WhenAll(allTasks);
3
await lines.ForEachAsync(500,cancellationToken,async line =>
HttpClientHandler httpClientHandler = new HttpClientHandler();
HttpClient client = new HttpClient(httpClientHandler);
try
HttpResponseMessage responseMessage = await client.GetAsync(line).ConfigureAwait(false);
var byteArray = await responseMessage.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
catch (Exception ex)
Interlocked.Increment(ref finishedUrls);
);
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, CancellationToken cancellationToken,
Func<T, Task> body)
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop).AsParallel()
select Task.Run(async delegate
using (partition)
while (partition.MoveNext())
await body(partition.Current).ConfigureAwait(false);
, cancellationToken));
我从 #3 解决方案中获得的最佳速度结果 - 在性能监视器中建立了大约 12 000 个 url/分钟和 10 000 个连接 - cpu 使用了 ~1%
但是,当我将 txt 文件拆分为 5 个部分 = 每个文件中有 10 000 000 个 url 并运行我的程序的 5 个实例时,汇总速度为 25 000 个 url / 分钟并建立了 30 000 个连接 - cpu 使用率为 3%。我正在玩将分区数从 500 增加到更多,但这并没有带来很大的变化。所以我的问题是 - 如何运行一个可以处理 25 000 个 URL / 分钟的程序实例?如何划分异步作业以获得尽可能高的速度?
对进程有任何 .NET 限制吗?
程序在64位windows server 2012上运行 500Mb 网络、64GB RAM、SSD 磁盘、E5-1620-v2 CPU
更新 1 不同“dop”和 4 个实例的速度结果同时: http://pastebin.com/ab3UQPAC
【问题讨论】:
这在很大程度上取决于它运行的硬件。您希望我们如何回答这个问题? 我写过硬件可以在 5 个实例中轻松处理摘要 25 000 urls/min 我知道你提到了分区,但是你把它增加到了 5 倍吗? 是的,我尝试了 2500 个分区 - 甚至 5000 个分区 - 速度并没有增加太多 - 仅达到 13000-14000 urls/min 与 500 相比,2500/5000 的 CPU 配置文件是什么 【参考方案1】:删除外部任务可能会有所帮助?
有点离题(减去异常处理?)
List<Task> allTasks = new List<Task>();
foreach (var line in lines)
HttpClientHandler httpClientHandler = new HttpClientHandler();
HttpClient client = new HttpClient(httpClientHandler);
try
allTasks.Add(client.GetAsync(line).
ContinueWith(t => t.Result.Content.ReadAsByteArrayAsync(), TaskContinuationOptions.OnlyOnRanToCompletion));
catch
await Task.WhenAll(allTasks);
如果有一个外部任务正在等待响应,是否可以想象您正在消耗过多的 ThreadPool 资源?不确定调度程序将如何处理此问题的具体细节,但外部任务对我来说似乎是多余的。
【讨论】:
为什么停止了?你可能仍然需要限制它。例如,获得 15,000 个网址需要多长时间。此外,您可能希望在其中保留上下文切换选项(例如 ConfigureAwait(false)。基本上遵循选项 2,但删除 Task.Run?并改用 ContinueWith? 是不是因为缺少异常处理而崩溃?也许它之前跳过了 URLS。我添加了一个 try catch【参考方案2】:尝试将System.Net.ServicePointManager.DefaultConnectionLimit
设置为一个非常高的数字,例如int.MaxValue
。
【讨论】:
@schglurps 之前发布了相同的答案 - 限制是每个域【参考方案3】:我认为您的问题与此有关:Limit of outgoing connections for one process (.Net) 尝试将最大连接数增加到您同时运行的任务数(可能是内核数)。
【讨论】:
每个域的连接数是有限的 - 正如我所说的,我有“唯一”域列表 好的,没看懂,抱歉以上是关于异步/等待和多处理的主要内容,如果未能解决你的问题,请参考以下文章