Java并行流:如何等待线程完成并行流?

Posted

技术标签:

【中文标题】Java并行流:如何等待线程完成并行流?【英文标题】:Java parallel stream: how to wait for threads for a parallel stream to finish? 【发布时间】:2018-06-21 13:46:40 【问题描述】:

所以我有一个列表,我可以从中获取并行流来填写地图,如下所示:

Map<Integer, TreeNode> map = new HashMap<>();
List<NodeData> list = some_filled_list;

//Putting data from the list into the map
list.parallelStream().forEach(d -> 
                TreeNode node = new TreeNode(d);
                map.put(node.getId(), node);
            );

//print out map
map.entrySet().stream().forEach(entry -> 
     System.out.println("Processing node with ID = " + entry.getValue().getId());
                );

这段代码的问题是,当“放置数据”过程仍在进行时,地图正在被打印出来(因为它是并行的),因此,地图还没有从列表中接收到所有元素.当然,在我的真实代码中,它不仅仅是打印出地图;我使用地图来利用 O(1) 查找时间。

我的问题是:

    如何让主线程等待,以便“放数据”在地图打印出来之前完成?我试图将“放置数据”放在线程 t 中,然后执行 t.start()t.join(),但这没有帮助。

    也许在这种情况下我不应该使用并行流?清单很长,我只是想利用并行性来提高效率。

【问题讨论】:

通过其构造函数new TreeNode(d) 创建TreeNode 实例的计算成本是否很高?我的意思是如果它有沉重的CPU负载。如果答案是否定的,那么使用并行流将一无所获。并行流仅对执行非常昂贵的 CPU 绑定任务的极少数应用程序有用。并行流不适用于只需要并发的场景(例如调用远程服务而不依次等待每个服务的结果)。 【参考方案1】:

使用此 list.parallelStream().forEach,您违反了 Stream 文档中明确规定的 side-effects 属性。

另外,当您说 此代码是在“放置数据”过程仍在进行时正在打印地图(因为它是并行的),这是不正确的,因为 forEach是一个终端操作,它将等待完成,直到它可以进入下一行。您可能会看到这样,因为您正在收集到非线程安全的HashMap 并且某些条目可能不在该映射中...考虑其他方式,如果您会将来自多个线程的多个条目放在HashMap 中吗?好吧,很多事情都可能会中断,比如丢失条目、在不正确/不一致的地图上等等。

当然,将其更改为 ConcurrentHashMap 会起作用,因为它是线程安全的,但您仍然违反了副作用属性,尽管是以“安全”的方式。

正确的做法是直接将collect 转换为Map,而不使用forEach

Map<Integer, TreeNode> map = list.parallelStream()
        .collect(Collectors.toMap(
                NodeData::getId,
                TreeNode::new
        ));

这样,即使是并行处理,一切都会好起来的。请注意,您需要 lots(数万个元素)才能从并行处理中获得任何可衡量的性能提升。

【讨论】:

为提到并行流不是任何事情的圣杯而竖起大拇指 在现实生活中实际发生的HashMaps 不一致的最有趣的例子之一是陷入无限循环,显然节点以循环方式链接,这是不可能的以顺序执行的心态查看代码时。哦,list.parallelStream() .collect(Collectors.toMap( NodeData::getId, TreeNode::new)) 是几乎不可能从并行中获益的场景之一,无论元素数量如何,因为合并成本与任何潜在的节省相当。 @Holger 这个在我的长期清单上,总有一天我会找到时间。确实合并两张地图是相当昂贵的...... @Eugene 好吧,那么,如果你有时间,this one 可能是一个很好的读物(请记住,toMap(…) 没有合并功能不允许重复键)。跨度> 【参考方案2】:

对于并行和非并行实现,流操作将阻塞直到完成。

所以您看到的不是the "putting data" process is still going on - 很可能只是数据损坏,因为HashMap 不是线程安全的。 尝试改用ConcurrentHashMap

【讨论】:

使用 ConcurrentHashMap 确实有帮助。刚测试过。但是,我仍然不清楚,为什么当我不使用并行流而使用顺序流时一切正常?那么,当我使用并行流时,我的常规 HashMap 中有什么损坏了?以及为什么它在与顺序流一起使用时不会损坏? 基本上put 方法读取HashMap 的表,然后向其中添加一些元素,然后将其写回内存。在串行(顺序)流中效果很好,但在并行世界中,两个线程可以读取一些值,然后独立修改它们并将结果写回内存。所以最后一次写入不会包含来自其他线程的更改,它们将被覆盖。 那是相当混乱的解释,所以你最好阅读一两篇关于线程安全主题的文章。这样的事情应该为你澄清一点:journaldev.com/1061/thread-safety-in-java【参考方案3】:

我猜如果流仍然可以处理,您可以尝试以下操作:

    List<NodeData> list = new ArrayList<>();

    //Putting data from the list into the map
    Map<Integer, TreeNode> map = list.parallelStream()
            .collect(Collectors.toMap(
                    n -> n.getId(),
                    n -> new TreeNode(n)
            ));

至少现在你在流上有一个终端。您可能会使用多个线程,并且映射肯定会完成。

【讨论】:

以上是关于Java并行流:如何等待线程完成并行流?的主要内容,如果未能解决你的问题,请参考以下文章

java8新特性——并行流与顺序流

对集合进行并行计算方法选择的建议:stream的并行流,CompletableFuture

啥?用了并行流还更慢了。。

Java8实战使用并行流

Stream并行流详解

并行流是不是以线程安全的方式处理上游迭代器?