Java并行流:如何等待线程完成并行流?
Posted
技术标签:
【中文标题】Java并行流:如何等待线程完成并行流?【英文标题】:Java parallel stream: how to wait for threads for a parallel stream to finish? 【发布时间】:2018-06-21 13:46:40 【问题描述】:所以我有一个列表,我可以从中获取并行流来填写地图,如下所示:
Map<Integer, TreeNode> map = new HashMap<>();
List<NodeData> list = some_filled_list;
//Putting data from the list into the map
list.parallelStream().forEach(d ->
TreeNode node = new TreeNode(d);
map.put(node.getId(), node);
);
//print out map
map.entrySet().stream().forEach(entry ->
System.out.println("Processing node with ID = " + entry.getValue().getId());
);
这段代码的问题是,当“放置数据”过程仍在进行时,地图正在被打印出来(因为它是并行的),因此,地图还没有从列表中接收到所有元素.当然,在我的真实代码中,它不仅仅是打印出地图;我使用地图来利用 O(1) 查找时间。
我的问题是:
如何让主线程等待,以便“放数据”在地图打印出来之前完成?我试图将“放置数据”放在线程 t 中,然后执行 t.start()
和 t.join()
,但这没有帮助。
也许在这种情况下我不应该使用并行流?清单很长,我只是想利用并行性来提高效率。
【问题讨论】:
通过其构造函数new TreeNode(d)
创建TreeNode
实例的计算成本是否很高?我的意思是如果它有沉重的CPU负载。如果答案是否定的,那么使用并行流将一无所获。并行流仅对执行非常昂贵的 CPU 绑定任务的极少数应用程序有用。并行流不适用于只需要并发的场景(例如调用远程服务而不依次等待每个服务的结果)。
【参考方案1】:
使用此 list.parallelStream().forEach
,您违反了 Stream 文档中明确规定的 side-effects
属性。
另外,当您说 此代码是在“放置数据”过程仍在进行时正在打印地图(因为它是并行的),这是不正确的,因为 forEach
是一个终端操作,它将等待完成,直到它可以进入下一行。您可能会看到这样,因为您正在收集到非线程安全的HashMap
并且某些条目可能不在该映射中...考虑其他方式,如果您会将来自多个线程的多个条目放在HashMap
中吗?好吧,很多事情都可能会中断,比如丢失条目、在不正确/不一致的地图上等等。
当然,将其更改为 ConcurrentHashMap
会起作用,因为它是线程安全的,但您仍然违反了副作用属性,尽管是以“安全”的方式。
正确的做法是直接将collect
转换为Map
,而不使用forEach
:
Map<Integer, TreeNode> map = list.parallelStream()
.collect(Collectors.toMap(
NodeData::getId,
TreeNode::new
));
这样,即使是并行处理,一切都会好起来的。请注意,您需要 lots(数万个元素)才能从并行处理中获得任何可衡量的性能提升。
【讨论】:
为提到并行流不是任何事情的圣杯而竖起大拇指 在现实生活中实际发生的HashMap
s 不一致的最有趣的例子之一是陷入无限循环,显然节点以循环方式链接,这是不可能的以顺序执行的心态查看代码时。哦,list.parallelStream() .collect(Collectors.toMap( NodeData::getId, TreeNode::new))
是几乎不可能从并行中获益的场景之一,无论元素数量如何,因为合并成本与任何潜在的节省相当。
@Holger 这个在我的长期清单上,总有一天我会找到时间。确实合并两张地图是相当昂贵的......
@Eugene 好吧,那么,如果你有时间,this one 可能是一个很好的读物(请记住,toMap(…)
没有合并功能不允许重复键)。跨度>
【参考方案2】:
对于并行和非并行实现,流操作将阻塞直到完成。
所以您看到的不是the "putting data" process is still going on
- 很可能只是数据损坏,因为HashMap
不是线程安全的。
尝试改用ConcurrentHashMap
。
【讨论】:
使用 ConcurrentHashMap 确实有帮助。刚测试过。但是,我仍然不清楚,为什么当我不使用并行流而使用顺序流时一切正常?那么,当我使用并行流时,我的常规 HashMap 中有什么损坏了?以及为什么它在与顺序流一起使用时不会损坏? 基本上put
方法读取HashMap
的表,然后向其中添加一些元素,然后将其写回内存。在串行(顺序)流中效果很好,但在并行世界中,两个线程可以读取一些值,然后独立修改它们并将结果写回内存。所以最后一次写入不会包含来自其他线程的更改,它们将被覆盖。
那是相当混乱的解释,所以你最好阅读一两篇关于线程安全主题的文章。这样的事情应该为你澄清一点:journaldev.com/1061/thread-safety-in-java【参考方案3】:
我猜如果流仍然可以处理,您可以尝试以下操作:
List<NodeData> list = new ArrayList<>();
//Putting data from the list into the map
Map<Integer, TreeNode> map = list.parallelStream()
.collect(Collectors.toMap(
n -> n.getId(),
n -> new TreeNode(n)
));
至少现在你在流上有一个终端。您可能会使用多个线程,并且映射肯定会完成。
【讨论】:
以上是关于Java并行流:如何等待线程完成并行流?的主要内容,如果未能解决你的问题,请参考以下文章