组合器在 java parallelStream reduce 中的实际作用是啥

Posted

技术标签:

【中文标题】组合器在 java parallelStream reduce 中的实际作用是啥【英文标题】:What is actual role of combiner in java parallelStream reduce组合器在 java parallelStream reduce 中的实际作用是什么 【发布时间】:2021-02-13 10:57:06 【问题描述】:

我对java流中的reduce方法有基本的了解。但是,组合器在parallelStream 中的作用我并不清楚。在下面的代码 sn-p 中,在第一个块中我使用了一个组合器,而它在第二个块中不存在。但是,两种情况下的结果是相同的。

List<Integer> intarr = Arrays.asList(10,20,30);
        Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b, (a,b) -> a+b);
        System.out.println("total sum: "+totsum);

List<Integer> intarr = Arrays.asList(10,20,30);
        Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b);
        System.out.println("total sum: "+totsum);

据了解,在2参数reduce方法的情况下,累加器是BinaryOperator,在3参数reduce方法的情况下是BiFunction。众所周知,这将有助于类型转换。例如,如果我需要将 int 转换为 double,我可以将我的标识指定为 20.0,combiner 会将其输出为 double 类型。

但是,除了类型转换之外,使用组合器实际上可以获得什么优势?

【问题讨论】:

【参考方案1】:

combiner 的重点是处理结果和输入是不同类型的情况,这里不是这种情况。例如,如果你想要一个字符串连接,你可以用reduce 来实现,写

stream.reduce("", (String str, int i) -> str + i, (String a, String b) -> a + b)

...在这种情况下,当流parallel缩减时,不同的chunk会分别累加,然后与combiner合并。

【讨论】:

【参考方案2】:

并不是说使用组合器比不使用组合器更“有利”。 reduce 的两个重载用于不同的目的。正如您所确定的,类型转换。

reduce(T, BinaryOperator&lt;T&gt;),没有组合器,返回一个T - 与流管道中的相同类型的东西。另一方面,reduce(U, BiFunction&lt;U, ? super T, U&gt;, BinaryOperator&lt;U&gt;) 使用组合器返回 U - 与流中的类型不同。

BigInteger sum = bigIntegerStream.reduce(BigInteger.ZERO, BigInteger::add);

我可以使用无合并器的版本来添加BigIntegers 的流。流将是Stream&lt;BigInteger&gt;,我想要的结果也是BigInteger 类型。我可以使用组合器版本将Stream&lt;Long&gt;BigInteger 相加。请注意流的类型与我想要的结果类型有何不同。

BigInteger sum = longStream.reduce(BigInteger.ZERO, (bi, l) -> bi.add(BigInteger.valueOf(l)), BigInteger::add);

可以说,您也可以在使用无合并器 reduce 之前完成 map

当结果类型与流的类型不同时需要组合器的原因是因为reduce 操作可以并行运行。例如,可以将整个流分成几个部分,每个部分并行缩减。

如果结果类型与流类型T 相同,则使用提供的二元运算符将每个部分缩减为T,并留下一堆Ts。我们可以使用相同的二元运算符进一步将这组Ts 减少为单个T

如果结果类型U 与流类型U 不同,那么BiFucntion 会将每个部分缩减为U,剩下的就是一堆Us不知道该怎么办,因为BiFunction 只需要UT 并返回U。我们需要一个额外的 BinaryOperator&lt;U&gt; 来帮助我们合并 Us。

所以如果你想要不同的结果类型,组合器是必要的。


另外,20 不是有效的标识值。一个有效的身份必须满足

accumulator.apply(identity, u) == u

对于所有u,对于无合并器版本,以及

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

适用于所有 ut,适用于合并器版本。

【讨论】:

【参考方案3】:

3-arg version 的签名是:

<U> U reduce(U identity,
             BiFunction<U,? super T,U> accumulator,
             BinaryOperator<U> combiner)

它打算在以下一项(或两项)为真时使用:

结果类型 (U) 与流元素类型 (T) 不同。 combineraccumulator 不同。

如果这些都不正确,那么2-arg version 更好(更简单、更容易):

T reduce(T identity,
         BinaryOperator<T> accumulator)

对于问题中显示的示例,使用 3-arg 版本没有任何优势。


具有不同类型的示例很多,例如查看其他答案。

类型相同但combineraccumulator 的示例适用于- 减号运算符:

List<Integer> intarr = Arrays.asList(10,20,30);

// Sequential processing doesn't use combiner: totsum = -60
Integer totsum = intarr.stream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);

// Parallel processing with same combiner does work: totsum = -20
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b);

Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);

// Parallel processing requires a different combiner: totsum = -60
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a + b);

那是因为通过并行处理,我们为该输入流获得了 3 个线程,所以代码变为:

thread1Result = accumulator.apply(0/*identity*/, 10);  // = 0 - 10 = -10

thread2Result = accumulator.apply(0/*identity*/, 20);  // = 0 - 20 = -20

thread3Result = accumulator.apply(0/*identity*/, 30);  // = 0 - 30 = -30

// Bad combiner: (a,b) -> a - b
result = combiner.apply(thread1Result, thread2Result); // = -10 - -20 = +10
result = combiner.apply(result, thread3Result);        // = +10 - -30 = -20

// Good combiner: (a,b) -> a + b
result = combiner.apply(thread1Result, thread2Result); // = -10 + -20 = -30
result = combiner.apply(result, thread3Result);        // = -30 + -30 = -60

【讨论】:

"// 并行处理需要不同的组合器:totsum = -20" 应该是-60? @samabcde 将所有复制/粘贴错误都搞砸了。谢谢!!

以上是关于组合器在 java parallelStream reduce 中的实际作用是啥的主要内容,如果未能解决你的问题,请参考以下文章

java 8 parallelStream() 和 sorted()

Java 8 的 parallelStream 中产生了多少线程?

Java 8里 Stream和parallelStream的区别

使用 parallelStream 时抛出 InterruptedException - Java [重复]

深入浅出parallelStream

为啥parallelStream 使用ForkJoinPool,而不是普通的线程池?