并行流在不同的操作下可以正常工作吗?

Posted

技术标签:

【中文标题】并行流在不同的操作下可以正常工作吗?【英文标题】:Will parallel stream work fine with distinct operation? 【发布时间】:2019-05-07 18:34:57 【问题描述】:

我正在阅读有关无国籍状态的文章,并在 doc 中遇到了这个问题:

流管道结果可能是不确定的或不正确的,如果 流操作的行为参数是有状态的。一种 有状态的 lambda(或其他实现适当的对象 功能接口)是一个其结果取决于任何状态的接口 在流管道执行期间可能会发生变化。

现在,如果我有一个字符串列表(例如strList),然后尝试通过以下方式使用并行流从中删除重复的字符串:

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());

或者如果我们不区分大小写:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)
                       .distinct().collect(Collectors.toList());

此代码是否有任何问题,因为并行流会将输入拆分,并且在一个块中不同并不一定意味着在整个输入中不同?

编辑(以下答案的快速摘要)

distinct 是有状态的操作,在有状态的中间操作的情况下,并行流可能需要多次传递或大量缓冲开销。如果元素的顺序不相关,distinct 也可以更有效地实现。 同样按照doc:

对于有序流,不同元素的选择是稳定的(对于 重复元素,该元素首先出现在遭遇战中 顺序被保留。)对于无序流,没有稳定性保证 制作完成。

但在有序流并行运行的情况下,distinct 可能不稳定 - 意味着它将保留任意元素以防重复,否则不一定是distinct 预期的第一个元素。

来自link:

在内部,distinct() 操作保留了一个 Set,其中包含 以前见过的元素,但它被埋在 操作,我们无法从应用程序代码中获取它。

因此,在并行流的情况下,它可能会消耗整个流或可能使用 CHM(诸如 ConcurrentHashMap.newKeySet() 之类的东西)。对于已订购的,很可能会使用LinkedHashSet 或类似的结构。

【问题讨论】:

我能想到的唯一问题是字符串的顺序可能与strList中的初始顺序不同 Stream#distinct 的 apiNote 中有提示:"@apiNote:在并行管道中保持 distinct() 的稳定性相对昂贵(需要 操作作为一个完整的屏障,具有大量的缓冲开销)"。同样也可以询问归约操作(通过并行归约比distinct 操作更容易想象) 引用的文本是关于 lambdas 的,而distinct() 不带 lambda,所以引用的文本无关紧要。此外,如果您阅读文档,即distinct() 的javadoc,您将看到它完全解决了并行 管道中的方法行为。唯一的问题是性能。该方法保证功能,如javadoc所述。 【参考方案1】:

大致指出doc的相关部分(重点,我的):

中间操作又分为无状态和 有状态的操作。无状态操作,例如过滤器和映射, 处理新元素时不保留先前看到的元素的状态 element -- 每个元素都可以独立于操作进行处理 在其他元素上。 有状态的操作,例如 distinct 和 sorted, 处理时可能会合并来自先前看到的元素的状态 新元素

有状态的操作可能需要先处理整个输入 产生结果。例如,不能产生任何结果 对流进行排序,直到看到流的所有元素。 作为 结果,在并行计算下,一些包含有状态的管道 中间操作可能需要对数据进行多次传递,或者可能 需要缓冲重要数据。管道仅包含 无状态的中间操作可以一次性处理, 无论是顺序的还是并行的,都具有最少的数据缓冲

如果您进一步阅读(订购部分):

流可能有也可能没有定义的相遇顺序。不管与否 流的相遇顺序取决于源和 中间操作。 某些流源(例如 List 或 数组)本质上是有序的,而其他(例如 HashSet) 不是。一些中间操作,例如 sorted(),可能会强加一个 在其他无序的流上遇到顺序,其他人可能 无序渲染有序流,例如 BaseStream.unordered()。 此外,一些终端操作可能会忽略遇到顺序,例如 forEach().

...

对于并行流,有时可以放宽排序约束 实现更高效的执行。 某些聚合操作,例如 过滤重复项(distinct())或分组减少 (Collectors.groupingBy()) 可以更有效地实现,如果 元素的顺序不相关。同样,操作是 本质上与遇到顺序相关,例如 limit(),可能需要 缓冲以确保正确排序,破坏了 并行性。 如果流有遇到顺序,但 用户并不特别关心遇到的顺序,明确地 使用 unordered() 对流进行降序可能会提高并行性 一些有状态或终端操作的性能。然而,大多数 流管道,例如上面的“块权重总和”示例, 即使在排序约束下仍然有效地并行化。

总之,

distinct 可以很好地处理并行流,但您可能已经知道,它必须在继续之前消耗整个流,这可能会占用大量内存。 如果项目的来源是无序集合(如hashset)或流是unordered(),那么distinct不担心对输出进行排序,因此会很高效

如果您不担心顺序并希望看到更多性能,解决方案是将.unordered() 添加到流管道中。

List<String> result2 = strList.parallelStream()
                              .unordered()
                              .map(String::toLowerCase)
                              .distinct()
                              .collect(Collectors.toList());

唉,Java 中没有(可用的内置)并发哈希集(除非他们巧妙地使用了 ConcurrentHashMap),所以我只能给您留下不幸的可能性,即 distinct 是使用常规 Java 集以阻塞方式实现的。在这种情况下,我看不出执行并行 distinct 有什么好处。


编辑:我说得太早了。使用具有 distinct 的并行流可能会有一些好处。看起来distinct 的实现比我最初想象的更聪明。见@Eugene'sanswer。

【讨论】:

正在阅读该文档。因此,在并行计算下,一些包含有状态中间操作的管道可能需要多次传递数据或可能需要缓冲重要数据. 似乎是 OP 正在寻找的准确答案。 @smac 实际上一个内置的并发Set。它的工厂方法放在ConcurrentHashMap 并没有真正改变:newKeySet() 我目前正在查看的实现(java 8 版本)实际上将ConcurrentHashMap&lt;T, Boolean&gt; 直接用于“并行无序”情况,但对“并行”使用不同的(基于缩减的)方法订购”。 @Hulk 没错,正是我在回答中提到的,显然LinkedHashSet 将用于有序操作;但是当知道源被排序时,内部使用了一种更有趣的方法 @Hulk 您是如何发现实现使用 CHM 来处理并行无序情况的。我无法在源代码中找到它。【参考方案2】:

您提供的文档和实际示例中似乎遗漏了很多东西。

如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。

在您的示例中,您没有定义任何有状态操作。文档中的有状态是指您定义的那些,而不是由 jdk 本身实现的那些 - 就像您的示例中的 distinct 一样。但无论哪种方式,您都可以定义一个正确的有状态操作,甚至是Stuart Marks - working at Oracle/Java, provides such an example。

所以在你提供的例子中,无论是否平行,你都很好。

distinct(并行)的昂贵部分来自这样一个事实,即内部必须有一个线程安全的数据结构来保持不同的元素;在 jdk 的情况下,它是 ConcurrentHashMap 用于以防订单无关紧要,或者在订单重要时使用 LinkedHashSet 减少。

distinct btw 是一个非常聪明的实现,它会查看您的流源是否已经不同(在这种情况下它是无操作的),或者查看您的数据是否已排序,在这种情况下它将对源进行更智能的遍历(因为它知道如果你看到了一个元素,下一个元素要么是你刚刚看到的,要么是不同的),或者在内部使用ConcurrentHashMap,等等。

【讨论】:

我可以看到set 用于在distinct 操作的情况下维护不重复。在并行流的情况下,它可能会消耗整个流本身,而不是使用 CHM。想法? @i_am_zero 不确定我是否关注你,想扩展你的评论吗? 好的。我只是想了解在distinct 的情况下并行流将如何工作?它会一次性消耗整个输入(在这种情况下没有好处)还是会使用 CHM?我找不到任何官方参考来澄清这一点。 @i_am_zero 无论哪种方式,都不会立即消耗整个流。而且您不必查看实现即可理解这一点。 Stream.of(1,2,2,3,4).peek(System.out::println).distinct().peek(System.out::println).collect(Collectors.toList());【参考方案3】:

不会有问题(问题是错误的结果),但正如API 注释所说的那样

在并行管道中保持 distinct() 的稳定性相对昂贵

但是,如果性能受到关注并且stability 不是问题(即结果与它处理的集合有关的元素顺序不同),那么您遵循 API 的说明

使用 BaseStream.unordered() 删除排序约束可能 导致 distinct() 的执行效率显着提高 并行管道,

我想为什么不对distinct 并行和顺序流的性能进行基准测试

public static void main(String[] args) 
        List<String> strList = Arrays.asList("cat", "nat", "hat", "tat", "heart", "fat", "bat", "lad", "crab", "snob");

        List<String> words = new Vector<>();


        int wordCount = 1_000_000; // no. of words in the list words
        int avgIter = 10; // iterations to run to find average running time

        //populate a list randomly with the strings in `strList`
        for (int i = 0; i < wordCount; i++) 
            words.add(strList.get((int) Math.round(Math.random() * (strList.size() - 1))));





        //find out average running times
        long starttime, pod = 0, pud = 0, sod = 0;
        for (int i = 0; i < avgIter; i++) 
            starttime = System.currentTimeMillis();
            List<String> parallelOrderedDistinct = words.parallelStream().distinct().collect(Collectors.toList());
            pod += System.currentTimeMillis() - starttime;

            starttime = System.currentTimeMillis();
            List<String> parallelUnorderedDistinct =
                    words.parallelStream().unordered().distinct().collect(Collectors.toList());
            pud += System.currentTimeMillis() - starttime;

            starttime = System.currentTimeMillis();
            List<String> sequentialOrderedDistinct = words.stream().distinct().collect(Collectors.toList());
            sod += System.currentTimeMillis() - starttime;
        

        System.out.println("Parallel ordered time in ms: " + pod / avgIter);
        System.out.println("Parallel unordered time in ms: " + pud / avgIter);
        System.out.println("Sequential implicitly ordered time in ms: " + sod / avgIter);
    

以上代码由 open-jdk 8 编译并在 i3 第 6 代(4 个逻辑核心)上的 openjdk 的 jre 8(无 jvm 特定参数)上运行,我得到了这些结果

似乎在某个没有之后。在元素中,有序并行速度更快,讽刺的是,无序并行速度最慢。这背后的原因(感谢@Hulk)是因为它的实现方式(使用HashSet)。所以一般规则是,如果你有几个元素和大量重复,你可能会从@中受益987654325@。

1)

Parallel ordered time in ms: 52
Parallel unordered time in ms: 81
Sequential implicitly ordered time in ms: 35

2)

Parallel ordered time in ms: 48
Parallel unordered time in ms: 83
Sequential implicitly ordered time in ms: 34

3)

Parallel ordered time in ms: 36
Parallel unordered time in ms: 70
Sequential implicitly ordered time in ms: 32

无序并行比两者慢两倍。

然后我将wordCount 提高到5_000_000,这就是结果

1)

Parallel ordered time in ms: 93
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 123

2)

Parallel ordered time in ms: 100
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 124

3)

Parallel ordered time in ms: 89
Parallel unordered time in ms: 365
Sequential implicitly ordered time in ms: 118

然后到10_000_000

1)

Parallel ordered time in ms: 148
Parallel unordered time in ms: 725
Sequential implicitly ordered time in ms: 218

2)

Parallel ordered time in ms: 150
Parallel unordered time in ms: 749
Sequential implicitly ordered time in ms: 224

3)

Parallel ordered time in ms: 143
Parallel unordered time in ms: 743
Sequential implicitly ordered time in ms: 222

【讨论】:

说明:您的设置 - 很少有不同字符串的大量重复有利于“并行排序”实现,因为(至少在我正在查看的版本中)这是通过并行缩减实现的LinkedHashSets - 由于这些集合很快收敛到您的一小部分输入字符串,因此组合步骤非常快。如果重复很少(只需要消除几个重复),结果可能会大不相同。 @Hulk 我用 40k 个不同的元素(8 个!)进行了尝试,是的,你是对的。我会把它放在我的答案中。【参考方案4】:

来自 javadocs,parallelStream()

返回一个可能以该集合作为源的并行流。 此方法允许返回顺序流。

性能:

    让我们考虑一下我们有一个多流(幸运),它被分配给不同的 CPU 内核。 ArrayList&lt;T&gt; 具有基于数组的内部数据表示。或者 LinkedList&lt;T&gt; 需要更多计算才能并行处理拆分。 ArrayList&lt;T&gt; 在这种情况下更好! stream.unordered().parallel().distinct() 的性能优于 stream.parallel().distinct()

在并行管道中保持 distinct() 的稳定性是 相对昂贵(要求操作作为一个完整的 屏障,具有大量缓冲开销)。

所以,在您的情况下,它应该不是问题(除非您的 List&lt;T&gt; 不关心订单)。阅读下面的解释,

假设您在 ArrayList 中有 4 个元素, "a","b","a","b"

现在如果你在调用distinct()之前使用parallelStream(),则只保留位置0和1的字符串。(保留顺序,顺序流)

否则,(如果你使用parallelStream().distinct())那么1和2处的元素可以保留为不同的(这是不稳定的,但结果是相同的“a,”b“或它甚至可以是 "b","a")。

不稳定的 distinct 操作会随机消除重复项。

最后,

在并行计算下,一些包含有状态的管道 中间操作可能需要对数据进行多次传递,或者可能 需要缓冲重要数据

【讨论】:

以上是关于并行流在不同的操作下可以正常工作吗?的主要内容,如果未能解决你的问题,请参考以下文章

并行缩减无法正常工作

引用另外表格,提示 #REF!路径也设置好了。被引用的表格不打开就错误,打开就正常。不用vba能处理吗?

combineReducers 无法正常工作。任何人都可以看到问题吗?

并发编程

缓存的网络图像在调试模式下工作正常,但在发布模式下显示占位符抖动,请问有啥解决方案吗?

运行并行 KMeans 时,“索引 N 超出轴 0 的范围,大小为 N”,而顺序 KMeans 工作正常