将每个异步结果存储在其自己的数组元素中
Posted
技术标签:
【中文标题】将每个异步结果存储在其自己的数组元素中【英文标题】:Storing each async result in its own array element 【发布时间】:2021-04-15 23:48:35 【问题描述】:假设我想从网站下载 1000 个食谱。这些网站最多接受 10 个并发连接。 每个配方都应该存储在一个数组中,在其对应的索引处。(我不想将数组发送到DownloadRecipe
方法。)
从技术上讲,我已经解决了这个问题,但我想知道是否有更简洁的方法来使用 async/await 或其他方法来实现它?
static async Task MainAsync()
int recipeCount = 1000;
int connectionCount = 10;
string[] recipes = new string[recipeCount];
Task<string>[] tasks = new Task<string>[connectionCount];
int r = 0;
while (r < recipeCount)
for (int t = 0; t < tasks.Length; t++)
tasks[t] = Task.Run(async () => recipes[r] = await DownloadRecipe(r));
r++;
await Task.WhenAll(tasks);
static async Task<string> DownloadRecipe(int index)
// ... await calls to download recipe
此外,此解决方案不是最佳解决方案,因为在所有 10 次运行下载完成之前,它不会开始新的下载。有什么我们可以在那里改进而不会使代码过于臃肿的地方吗?线程池限制为 10 个线程?
【问题讨论】:
我不确定它是否会对您有所帮助,但是从 C# 8.0 开始,您可以使用异步迭代和异步 foreach 循环来处理 docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/… 也许它会帮助您更轻松地实现您想要的东西跨度> 我认为您的解决方案之所以有效,是因为recipeCount
可以被 connectionCount
整除。否则会在此处发生参数超出范围异常:recipes[r] = await...
。您的解决方案可能也容易受到此问题的影响:Captured variable in a loop in C#,关于变量 r
。
@TheodorZoulias 是的,你是对的。我稍微重写了解决方案以使其更短并清楚地显示我想要改进的代码。对我来说幸运的是,这些问题都不存在于我的较长解决方案中。谢谢指出!
【参考方案1】:
有很多方法可以做到这一点。一种方法是使用ActionBlock
,它使您可以相当轻松地访问MaxDegreeOfParallelism
,并且可以很好地与async
方法配合使用
static async Task MainAsync()
var recipeCount = 1000;
var connectionCount = 10;
var recipes = new string[recipeCount];
async Task Action(int i) => recipes[i] = await DownloadRecipe(i);
var processor = new ActionBlock<int>(Action, new ExecutionDataflowBlockOptions()
MaxDegreeOfParallelism = connectionCount,
SingleProducerConstrained = true
);
for (var i = 0; i < recipeCount; i++)
await processor.SendAsync(i);
processor.Complete();
await processor.Completion;
static async Task<string> DownloadRecipe(int index)
...
另一种方法可能是使用SemaphoreSlim
var slim = new SemaphoreSlim(connectionCount, connectionCount);
var tasks = Enumerable
.Range(0, recipeCount)
.Select(Selector);
async Task<string> Selector(int i)
await slim.WaitAsync()
try
return await DownloadRecipe(i)
finally
slim.Release();
var recipes = await Task.WhenAll(tasks);
另一组方法是使用响应式扩展 (Rx)... 再次,有很多方法可以做到这一点,这只是一种 等待 方法(考虑到所有事情可能会更好)
var results = await Enumerable
.Range(0, recipeCount)
.ToObservable()
.Select(i => Observable.FromAsync(() => DownloadRecipe(i)))
.Merge(connectionCount)
.ToArray()
.ToTask();
【讨论】:
@00110001 为了简化问题,我没有包括我的“食谱”数组实际上是多维的这一事实。我喜欢 ActionBlock 解决方案,但是否可以将它与async Task Action(int i, int j) => recipes[i, j] = await DownloadRecipe(i, j);
之类的东西一起使用
我设法用Tuple
解决了上面的多参数问题。但我还有一些在我的问题中忘记的额外要求:每下载 100 个食谱后,我想运行一些将数组序列化为文件的自定义代码。有没有办法在每 100 次下载时获得另一个方法调用?
@DanielJohansson 第一个问题是肯定的,是的,元组将是最简单的解决方案。您可以使用 TPL 数据流创建更大的管道。即你可以设想从一个transformBlock 到一个BatchBlock 再到一个ActionBlock。我很乐意回答,但它有点超出了这个问题的范围。如果你问另一个人,请联系我
@DanielJohansson 有很多 TPL 资源,看看这个docs.microsoft.com/en-us/dotnet/standard/parallel-programming/… 虽然相对而言,一旦你知道它们很容易组合在一起的概念【参考方案2】:
拥有 10 个“池”的替代方法将“同时”加载数据。
您不需要使用单独的线程来包装 IO 操作。使用单独的线程进行 IO 操作只是浪费资源。
请注意,下载数据的线程什么都不做,只是等待响应。这就是async-await
方法非常方便的地方 - 我们可以发送多个请求而无需等待它们完成并且不会浪费线程。
static async Task MainAsync()
var requests = Enumerable.Range(0, 1000).ToArray();
var maxConnections = 10;
var pools = requests
.GroupBy(i => i % maxConnections)
.Select(group => DownloadRecipesFor(group.ToArray()))
.ToArray();
await Task.WhenAll(pools);
var recipes = pools.SelectMany(pool => pool.Result).ToArray();
static async Task<IEnumerable<string>> DownLoadRecipesFor(params int[] requests)
var recipes = new List<string>();
foreach (var request in requests)
var recipe = await DownloadRecipe(request);
recipes.Add(recipe);
return recipes;
因为在池中(DownloadRecipesFor
方法),我们会一一下载结果 - 我们确保我们一直有不超过 10 个活动请求。
这比原来的更有效一点,因为我们不会等到 10 个任务完成后才开始下一个“一堆”。 这并不理想,因为如果最后一个“池”提前结束,那么其他“池”将无法获取下一个要处理的请求。
最终结果会有相应的索引,因为我们将按照我们创建它们的顺序处理“池”和内部请求。
【讨论】:
@TheodorZoulias,理想的方法是使用SemaphoreSlim
,因为这是我们有 10 个活动任务的方式,当其中一个任务完成时,我们开始另一个。以上是关于将每个异步结果存储在其自己的数组元素中的主要内容,如果未能解决你的问题,请参考以下文章