提高性能异步 Parallel.Foreach

Posted

技术标签:

【中文标题】提高性能异步 Parallel.Foreach【英文标题】:Increase performance async Parallel.Foreach 【发布时间】:2019-08-06 19:40:54 【问题描述】:

我有一个包含超过 10k 个项目的 deviceList,并希望通过调用另一个方法来发送数据。

我尝试使用 Parallel.Foreach,但我不确定这是不是正确的方法。

我已经在 azure 上发布了这个 webapp,我已经测试了 100 次它可以正常工作,但是对于 10k 它出现了超时问题。我的实现是否需要任何调整,谢谢

private List<Task> taskEventList = new List<Task>();
public async Task ProcessStart()

    string messageData = "\"name\":\"DemoData\",\"no\":\"111\"";
    RegistryManager registryManager;

    Parallel.ForEach(deviceList, async (device) =>
    
        // get details for each device and use key to send message
        device = await registryManager.GetDeviceAsync(device.DeviceId);
        SendMessages(device.DeviceId, device.Key, messageData);
    );

    if (taskEventList.Count > 0)
    
        await Task.WhenAll(taskEventList);
    


private void SendMessages(string deviceId, string Key, string messageData)

    DeviceClient deviceClient = DeviceClient.Create(hostName, new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, deviceKey), Microsoft.Azure.Devices.Client.TransportType.Mqtt);
    //created separate Task
    var taskEvents = Task.Run(() => ProcessMessages(deviceId, string messageData));
    taskEventList.Add(taskEvents);


private async Task ProcessMessages(string deviceId, string messageData)

    var startTime = DateTime.UtcNow;
    while (DateTime.UtcNow - startTime < TimeSpan.FromMinutes(15))
    
        await deviceClient.SendEventAsync(messageData);
    

【问题讨论】:

请以合理的方式格式化您的代码,以便我们阅读 @JoePhillips 以明智的方式更新了它:) 请看看 该代码不会生成。 messageData 字符串需要转义双引号。此外,您的格式仍然有点尴尬。有些地方你打算有 2 个空格,有些地方有 8 个空格。真的很难花精力读这篇文章 请注意,Parallel.ForEach 不适用于异步 IO 任务,它适用于 CPU 绑定操作。要启动并行 IO 任务,请使用 SelectTask.WhenAll SemaphoreSlim 是您要在此处使用的节流机制,而不是 Parallel.Foreach 【参考方案1】:

至少肯定存在竞争条件。 Parallel用于同步代码,而不是异步代码。

据我所知,您不需要 ParallelTask.Run(它们都是 ASP.NET 服务的反模式):

public async Task ProcessStart()

  string messageData = "\"name\":\"DemoData\",\"no\":\"111\"";
  RegistryManager registryManager;

  var tasks = deviceList.Select(async device =>
  
    // get details for each device and use key to send message
    device = await registryManager.GetDeviceAsync(device.DeviceId);
    await SendMessagesAsync(device.DeviceId, device.Key, messageData);
  ).ToList();

  await Task.WhenAll(tasks);


private async Task SendMessagesAsync(string deviceId, string Key, string messageData)

  DeviceClient deviceClient = DeviceClient.Create(hostName, new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, deviceKey), Microsoft.Azure.Devices.Client.TransportType.Mqtt);
  await ProcessMessagesAsync(deviceId, string messageData);


private async Task ProcessMessagesAsync(string deviceId, string messageData)

  var startTime = DateTime.UtcNow;
  while (DateTime.UtcNow - startTime < TimeSpan.FromMinutes(15))
  
    await deviceClient.SendEventAsync(messageData);
  

10k 时出现超时问题。

15 分钟对于 HTTP 请求来说是一个时间。我认为值得退后一步,看看是否有更好的方法来构建整个系统。

【讨论】:

非常感谢您的回答,是的,15 分钟对于 HTTP 请求来说是很长的时间,但是如何解决这种情况呢?我还需要在 30 和 60 分钟内执行相同的代码 :( 你能在这种情况下提供更多帮助吗?提前谢谢 @Neo 一种常见的方法是让 HTTP 请求进入(持久)队列中要完成的工作并返回该工作的 ID。然后有一个单独的后台进程来处理该队列中的工作。同时,前端将轮询(不同的)HTTP 端点以检查该 ID 的状态,以便知道何时完成以及是否有任何错误。 所以你的意思是说我可以在后端使用 azure 函数,它会持续更新操作 15 分钟.... 是的,Azure Function 是后台进程的不错选择。 错误与 DeviceClient 相关,仅出现异常`"I/O error occurred."` 我将发布详细信息另一个问号作为答案。

以上是关于提高性能异步 Parallel.Foreach的主要内容,如果未能解决你的问题,请参考以下文章

在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?

通过将长时间运行的任务分成单独的进程来提高程序性能

由于 HttpClient 请求缓慢,Task.Result 在 Parallel.ForEach 内阻塞

在 Parallel.foreach 中等待 [重复]

通过将长时间运行的任务拆分为单独的进程来提高程序性能

如何限制 Parallel.ForEach?