如何从 Parallel.ForEach 收集返回值?

Posted

技术标签:

【中文标题】如何从 Parallel.ForEach 收集返回值?【英文标题】:How do I collect return values from Parallel.ForEach? 【发布时间】:2012-09-18 14:50:35 【问题描述】:

我正在并行调用慢速 Web 服务。事情很好,直到我意识到我需要从服务中获取一些信息。但我看不到从哪里取回这些值。我无法写入数据库,HttpContext.Current 在使用 Parallel.ForEach 调用的方法中似乎为空

下面是一个示例程序(在您的脑海中,请想象一个缓慢的网络服务而不是字符串连接)

using System;
using System.Threading.Tasks;

class Program

    static void Main(string[] args)
    
        WordMaker m = new WordMaker();
        m.MakeIt();
    
    public class WordMaker
    
        public void MakeIt()
        
            string[] words =  "ack", "ook" ;
            ParallelLoopResult result = Parallel.ForEach(words, word => AddB(word));
            Console.WriteLine("Where did my results go?");
            Console.ReadKey();
        
        public string AddB(string word)
        
            return "b" + word;
        
    


【问题讨论】:

Parallel.ForEach 的不同重载可能是您想要的:msdn.microsoft.com/en-us/library/dd991486.aspx 不幸的是,这不是你能做到的。 Parallel.Foreach() 并不是为跟踪退货而设计的。但是,我建议在您的 AddB 函数中使用 ref 参数。这可能会做到。 @PhillipSchmidt:无论如何都不是示例中使用的重载... @AustinSalonen 还有什么超载可以工作的?我不是在争论,我只是不知道有什么可以做他想做的事。 @PhillipSchmidt 我第一条评论中的链接明确处理Func<...> 而不是Action<...> 【参考方案1】:

你已经把它丢在这里了。

ParallelLoopResult result = Parallel.ForEach(words, word => AddB(word));

你可能想要类似的东西,

ParallelLoopResult result = Parallel.ForEach(words, word =>

    string result = AddB(word);
    // do something with result
);

如果您想要在此结束时进行某种集合,请考虑使用System.Collections.Concurrent 下的集合之一,例如ConcurrentBag

ConcurrentBag<string> resultCollection = new ConcurrentBag<string>();
ParallelLoopResult result = Parallel.ForEach(words, word =>

    resultCollection.Add(AddB(word));
);

// Do something with the result

【讨论】:

我认为 ParallelLoopResult 在这里没有任何用处。 +1 虽然 .AsParallel() in LINQ 会好很多 @eranotzap 这就是使用 ConcurrentBag 而不是 List 解决的问题。 @NineTails 所以这是个问题。只添加不添加和删除。仅将并发问题添加到列表中。也许它是以原子方式完成的。这意味着支持数组中的索引可能会自动递增 @PatrickfromNDependteam 并不是说​​您已将您的想法发布为an answer,认为这是一个比这更好的答案的人会开始支持您的答案,并将其移至顶部。每次点赞都会获得声望点数,所以这是一个双赢的局面。【参考方案2】:

您可以考虑使用IEnumerableAsParallel 扩展方法,它会为您处理并发并收集结果。

words.AsParallel().Select(AddB).ToArray()

同步(例如锁或使用锁的并发集合)通常是并发算法的瓶颈。最好的办法是尽可能避免同步。我猜AsParallel 使用了一些更聪明的方法,比如将单线程上生成的所有项目放入本地非并发集合中,然后在最后组合它们。

【讨论】:

这明显更好。 当我尝试这个时,select 中的代码似乎没有在不同的线程中运行。但是 Parallel.Foreach 做到了。 您确定您使用的是 AsParallel 吗?是什么让您认为它没有使用更多线程? (可选)您也可以在.AsParallel() 之后添加.AsOrdered(),以按输入值的顺序接收结果。【参考方案3】:

不要使用ConcurrentBag 来收集结果,因为它比较慢。 请改用本地锁。

var resultCollection = new List<string>();
object localLockObject = new object();

Parallel.ForEach<string, List<string>>(
      words,
      () =>  return new List<string>(); ,
      (word, state, localList) =>
      
         localList.Add(AddB(word));
         return localList;
      ,
      (finalResult) =>  lock (localLockObject) resultCollection.AddRange(finalResult); 
); 

// Do something with resultCollection here

【讨论】:

你有任何统计数据表明 ConcurrentBag 比使用我们自己的对象锁要慢吗?我只想知道它有多慢,因为它使我的代码看起来比使用对象锁更干净。 @dineshygv 恕我直言,差异可以忽略不计***.com/questions/2950955/… 或者根本不使用任何锁定;-)【参考方案4】:

这看起来安全、快速、简单:

    public string[] MakeIt() 
        string[] words =  "ack", "ook" ;
        string[] results = new string[words.Length];
        ParallelLoopResult result =
            Parallel.For(0, words.Length, i => results[i] = AddB(words[i]));
        return results;
    

【讨论】:

这可能会导致缓存乒乓,尽管仍然比并发收集好很多。 非常非常明智的做法! @overlord-zurg【参考方案5】:

这样的事情怎么样:

public class WordContainer

    public WordContainer(string word)
    
        Word = word;
    

    public string Word  get; private set; 
    public string Result  get; set; 


public class WordMaker

    public void MakeIt()
    
        string[] words =  "ack", "ook" ;
        List<WordContainer> containers = words.Select(w => new WordContainer(w)).ToList();

        Parallel.ForEach(containers, AddB);

        //containers.ForEach(c => Console.WriteLine(c.Result));
        foreach (var container in containers)
        
            Console.WriteLine(container.Result);
        

        Console.ReadKey();
    

    public void AddB(WordContainer container)
    
        container.Result = "b" + container.Word;
    

我相信锁定或并发对象不是必需的,除非您需要结果相互交互(例如计算总和或组合所有单词)。在这种情况下,ForEach 巧妙地分解了您的原始列表,并为每个线程提供了自己的对象,它可以随意操作它想要的一切,而不必担心干扰其他线程。

【讨论】:

是的,这适用于控制台应用程序,但对于控制台应用程序的事件,您可能希望首先将它们聚合到集合中,否则您会在控制台窗口中获得交错的结果。 Console.WriteLine 命令在主线程上同步运行,在 Parallel.ForEach 处理完所有列表项并返回后,它将按照在原始 List 中定义的顺序打印结果。如果我从 Parallel.ForEach 中调用 WriteLine,那么是的,结果将是交错的。【参考方案6】:

在预先知道集合大小的特定情况下(实践中经常出现这种情况)可以使用数组来代替昂贵的并发集合。由于每个循环都访问ouputs 数组中自己的槽,因此不存在冲突风险。作为奖励输出的存储顺序与输入相同:

const int NB_WORDS = 1000;
var inputs = new string[NB_WORDS];
for(var i= 0; i < NB_WORDS; i++)  inputs[i] = i.ToString(); 

var outputs = new string[NB_WORDS];

Parallel.For(0, NB_WORDS, index => 
   string word = inputs[index];
   string result = word + word; // Operation on word
   outputs[index] = result; // No need of a concurrent collection to store the result!
);

Debug.Assert(outputs.All(result => !string.IsNullOrEmpty(result)));

【讨论】:

不是用Parallel.ForEach枚举Enumerable.Range,用Parallel.For方法不是更简单吗? @TheodorZoulias 非常感谢

以上是关于如何从 Parallel.ForEach 收集返回值?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Parallel.ForEach 正确写入文件?

C# - 用于服务调用的 Parallel.Foreach()

C# Parallel.ForEach 和 Task.WhenAll 有时返回的值比预期的少

如何限制 Parallel.ForEach?

从 Parallel.ForEach 抛出未处理的 OperationCanceledException

如何将此 foreach 代码转换为 Parallel.ForEach?