将每个异步结果存储在其自己的数组元素中

Posted

技术标签:

【中文标题】将每个异步结果存储在其自己的数组元素中【英文标题】:Storing each async result in its own array element 【发布时间】:2021-04-15 23:48:35 【问题描述】:

假设我想从网站下载 1000 个食谱。这些网站最多接受 10 个并发连接。 每个配方都应该存储在一个数组中,在其对应的索引处。(我不想将数组发送到DownloadRecipe 方法。)

从技术上讲,我已经解决了这个问题,但我想知道是否有更简洁的方法来使用 async/await 或其他方法来实现它?

    static async Task MainAsync()
    
        int recipeCount = 1000;
        int connectionCount = 10;
        string[] recipes = new string[recipeCount];
        Task<string>[] tasks = new Task<string>[connectionCount];
        int r = 0;

        while (r < recipeCount)
        
            for (int t = 0; t < tasks.Length; t++)
            
                tasks[t] = Task.Run(async () => recipes[r] = await DownloadRecipe(r));
                r++;
            

            await Task.WhenAll(tasks);
        
    

    static async Task<string> DownloadRecipe(int index)
    
        // ... await calls to download recipe
    

此外,此解决方案不是最佳解决方案,因为在所有 10 次运行下载完成之前,它不会开始新的下载。有什么我们可以在那里改进而不会使代码过于臃肿的地方吗?线程池限制为 10 个线程?

【问题讨论】:

我不确定它是否会对您有所帮助,但是从 C# 8.0 开始,您可以使用异步迭代和异步 foreach 循环来处理 docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/… 也许它会帮助您更轻松地实现您想要的东西跨度> 我认为您的解决方案之所以有效,是因为 recipeCount 可以被 connectionCount 整除。否则会在此处发生参数超出范围异常:recipes[r] = await...。您的解决方案可能也容易受到此问题的影响:Captured variable in a loop in C#,关于变量 r @TheodorZoulias 是的,你是对的。我稍微重写了解决方案以使其更短并清楚地显示我想要改进的代码。对我来说幸运的是,这些问题都不存在于我的较长解决方案中。谢谢指出! 【参考方案1】:

有很多方法可以做到这一点。一种方法是使用ActionBlock,它使您可以相当轻松地访问MaxDegreeOfParallelism,并且可以很好地与async 方法配合使用

static async Task MainAsync()

   var recipeCount = 1000;
   var connectionCount = 10;
   var recipes = new string[recipeCount];

   async Task Action(int i) => recipes[i] = await DownloadRecipe(i);
   
   var processor = new ActionBlock<int>(Action, new ExecutionDataflowBlockOptions()
   
      MaxDegreeOfParallelism = connectionCount,
      SingleProducerConstrained = true
   );

   for (var i = 0; i < recipeCount; i++)
      await processor.SendAsync(i);

   processor.Complete();
   await processor.Completion;


static async Task<string> DownloadRecipe(int index)

   ...

另一种方法可能是使用SemaphoreSlim

var slim = new SemaphoreSlim(connectionCount, connectionCount);

var tasks = Enumerable
   .Range(0, recipeCount)
   .Select(Selector);
   
async Task<string> Selector(int i)

   await slim.WaitAsync()
   try
   
      return await DownloadRecipe(i)
   
   finally
   
      slim.Release();
   


var recipes = await Task.WhenAll(tasks);

另一组方法是使用响应式扩展 (Rx)... 再次,有很多方法可以做到这一点,这只是一种 等待 方法(考虑到所有事情可能会更好)

var results = await Enumerable
        .Range(0, recipeCount)
        .ToObservable()
        .Select(i => Observable.FromAsync(() => DownloadRecipe(i)))
        .Merge(connectionCount)
        .ToArray()
        .ToTask();

【讨论】:

@00110001 为了简化问题,我没有包括我的“食谱”数组实际上是多维的这一事实。我喜欢 ActionBlock 解决方案,但是否可以将它与 async Task Action(int i, int j) =&gt; recipes[i, j] = await DownloadRecipe(i, j); 之类的东西一起使用 我设法用Tuple 解决了上面的多参数问题。但我还有一些在我的问题中忘记的额外要求:每下载 100 个食谱后,我想运行一些将数组序列化为文件的自定义代码。有没有办法在每 100 次下载时获得另一个方法调用? @DanielJohansson 第一个问题是肯定的,是的,元组将是最简单的解决方案。您可以使用 TPL 数据流创建更大的管道。即你可以设想从一个transformBlock 到一个BatchBlock 再到一个ActionBlock。我很乐意回答,但它有点超出了这个问题的范围。如果你问另一个人,请联系我 @DanielJohansson 有很多 TPL 资源,看看这个docs.microsoft.com/en-us/dotnet/standard/parallel-programming/… 虽然相对而言,一旦你知道它们很容易组合在一起的概念【参考方案2】:

拥有 10 个“池”的替代方法将“同时”加载数据。

您不需要使用单独的线程来包装 IO 操作。使用单独的线程进行 IO 操作只是浪费资源。 请注意,下载数据的线程什么都不做,只是等待响应。这就是async-await 方法非常方便的地方 - 我们可以发送多个请求而无需等待它们完成并且不会浪费线程。

static async Task MainAsync()

    var requests = Enumerable.Range(0, 1000).ToArray();
    var maxConnections = 10;
    var pools = requests
        .GroupBy(i => i % maxConnections)
        .Select(group => DownloadRecipesFor(group.ToArray()))
        .ToArray();

    await Task.WhenAll(pools);

    var recipes = pools.SelectMany(pool => pool.Result).ToArray();


static async Task<IEnumerable<string>> DownLoadRecipesFor(params int[] requests)

    var recipes = new List<string>();
    foreach (var request in requests)
    
        var recipe = await DownloadRecipe(request);
        recipes.Add(recipe);
    

    return recipes;

因为在池中(DownloadRecipesFor 方法),我们会一一下载结果 - 我们确保我们一直有不超过 10 个活动请求。

这比原来的更有效一点,因为我们不会等到 10 个任务完成后才开始下一个“一堆”。 这并不理想,因为如果最后一个“池”提前结束,那么其他“池”将无法获取下一个要处理的请求。

最终结果会有相应的索引,因为我们将按照我们创建它们的顺序处理“池”和内部请求。

【讨论】:

@TheodorZoulias,理想的方法是使用SemaphoreSlim,因为这是我们有 10 个活动任务的方式,当其中一个任务完成时,我们开始另一个。

以上是关于将每个异步结果存储在其自己的数组元素中的主要内容,如果未能解决你的问题,请参考以下文章

如何在Javascript变量中存储脚本url的结果

从异步回调函数将对象推送到本地数组

从 SQL 查询结果构建父/子数组菜单结构

如何将 POST 元素存储在数组中

数据结构——1数组

在THREE.js中围绕自己的中心旋转每个网格项目