组合器在 java parallelStream reduce 中的实际作用是啥
Posted
技术标签:
【中文标题】组合器在 java parallelStream reduce 中的实际作用是啥【英文标题】:What is actual role of combiner in java parallelStream reduce组合器在 java parallelStream reduce 中的实际作用是什么 【发布时间】:2021-02-13 10:57:06 【问题描述】:我对java流中的reduce
方法有基本的了解。但是,组合器在parallelStream
中的作用我并不清楚。在下面的代码 sn-p 中,在第一个块中我使用了一个组合器,而它在第二个块中不存在。但是,两种情况下的结果是相同的。
List<Integer> intarr = Arrays.asList(10,20,30);
Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b, (a,b) -> a+b);
System.out.println("total sum: "+totsum);
List<Integer> intarr = Arrays.asList(10,20,30);
Integer totsum = intarr.stream().reduce(20, (a,b) -> a+b);
System.out.println("total sum: "+totsum);
据了解,在2参数reduce
方法的情况下,累加器是BinaryOperator
,在3参数reduce
方法的情况下是BiFunction
。众所周知,这将有助于类型转换。例如,如果我需要将 int 转换为 double,我可以将我的标识指定为 20.0,combiner 会将其输出为 double 类型。
但是,除了类型转换之外,使用组合器实际上可以获得什么优势?
【问题讨论】:
【参考方案1】:combiner 的重点是处理结果和输入是不同类型的情况,这里不是这种情况。例如,如果你想要一个字符串连接,你可以用reduce
来实现,写
stream.reduce("", (String str, int i) -> str + i, (String a, String b) -> a + b)
...在这种情况下,当流parallel缩减时,不同的chunk会分别累加,然后与combiner合并。
【讨论】:
【参考方案2】:并不是说使用组合器比不使用组合器更“有利”。 reduce
的两个重载用于不同的目的。正如您所确定的,类型转换。
reduce(T, BinaryOperator<T>)
,没有组合器,返回一个T
- 与流管道中的相同类型的东西。另一方面,reduce(U, BiFunction<U, ? super T, U>, BinaryOperator<U>)
使用组合器返回 U
- 与流中的类型不同。
BigInteger sum = bigIntegerStream.reduce(BigInteger.ZERO, BigInteger::add);
我可以使用无合并器的版本来添加BigInteger
s 的流。流将是Stream<BigInteger>
,我想要的结果也是BigInteger
类型。我可以使用组合器版本将Stream<Long>
与BigInteger
相加。请注意流的类型与我想要的结果类型有何不同。
BigInteger sum = longStream.reduce(BigInteger.ZERO, (bi, l) -> bi.add(BigInteger.valueOf(l)), BigInteger::add);
可以说,您也可以在使用无合并器 reduce
之前完成 map
。
当结果类型与流的类型不同时需要组合器的原因是因为reduce
操作可以并行运行。例如,可以将整个流分成几个部分,每个部分并行缩减。
如果结果类型与流类型T
相同,则使用提供的二元运算符将每个部分缩减为T
,并留下一堆T
s。我们可以使用相同的二元运算符进一步将这组T
s 减少为单个T
。
如果结果类型U
与流类型U
不同,那么BiFucntion
会将每个部分缩减为U
,剩下的就是一堆U
s不知道该怎么办,因为BiFunction
只需要U
和T
并返回U
。我们需要一个额外的 BinaryOperator<U>
来帮助我们合并 U
s。
所以如果你想要不同的结果类型,组合器是必要的。
另外,20
不是有效的标识值。一个有效的身份必须满足
accumulator.apply(identity, u) == u
对于所有u
,对于无合并器版本,以及
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
适用于所有 u
和 t
,适用于合并器版本。
【讨论】:
【参考方案3】:3-arg version 的签名是:
<U> U reduce(U identity,
BiFunction<U,? super T,U> accumulator,
BinaryOperator<U> combiner)
它打算在以下一项(或两项)为真时使用:
结果类型 (U
) 与流元素类型 (T
) 不同。
combiner
与 accumulator
不同。
如果这些都不正确,那么2-arg version 更好(更简单、更容易):
T reduce(T identity,
BinaryOperator<T> accumulator)
对于问题中显示的示例,使用 3-arg 版本没有任何优势。
具有不同类型的示例很多,例如查看其他答案。
类型相同但combiner
为accumulator
的示例适用于-
减号运算符:
List<Integer> intarr = Arrays.asList(10,20,30);
// Sequential processing doesn't use combiner: totsum = -60
Integer totsum = intarr.stream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);
// Parallel processing with same combiner does work: totsum = -20
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b);
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a - b);
// Parallel processing requires a different combiner: totsum = -60
Integer totsum = intarr.parallelStream().reduce(0, (a,b) -> a - b, (a,b) -> a + b);
那是因为通过并行处理,我们为该输入流获得了 3 个线程,所以代码变为:
thread1Result = accumulator.apply(0/*identity*/, 10); // = 0 - 10 = -10
thread2Result = accumulator.apply(0/*identity*/, 20); // = 0 - 20 = -20
thread3Result = accumulator.apply(0/*identity*/, 30); // = 0 - 30 = -30
// Bad combiner: (a,b) -> a - b
result = combiner.apply(thread1Result, thread2Result); // = -10 - -20 = +10
result = combiner.apply(result, thread3Result); // = +10 - -30 = -20
// Good combiner: (a,b) -> a + b
result = combiner.apply(thread1Result, thread2Result); // = -10 + -20 = -30
result = combiner.apply(result, thread3Result); // = -30 + -30 = -60
【讨论】:
"// 并行处理需要不同的组合器:totsum = -20" 应该是-60? @samabcde 将所有复制/粘贴错误都搞砸了。谢谢!!以上是关于组合器在 java parallelStream reduce 中的实际作用是啥的主要内容,如果未能解决你的问题,请参考以下文章
java 8 parallelStream() 和 sorted()
Java 8 的 parallelStream 中产生了多少线程?
Java 8里 Stream和parallelStream的区别