JDK10 之前仍有缺陷的Java Stream API
Posted SpringForAll社区
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK10 之前仍有缺陷的Java Stream API相关的知识,希望对你有一定的参考价值。
Stream API 的bug会影响使用JDK8或JDK9的每个人,点击了解更多
当然,并非所有,但历史表明Stream API具有一些有趣的错误/缺陷,可能会影响仍然驻留在使用JDK 8和JDK 9上的每一个人。
1. Stream#flatMap
不幸的是,事实证明Stream#flatMap API并不像标榜的那样Lazy,以至允许存在几种抓狂的情况。举个例子,来看看这个:
Stream.of(1)
.flatMap(i -> Stream.generate(() -> 42))
.findAny()
.ifPresent(System.out::println);
在JDK 8和JDK 9中,上面的代码片段永远等待着内部无限流的计算。一个从无限序列中取出单个元素的简单操作,期望时间复杂度为O(1)——只要我们不处理内部的无限流,这就是它的工作方式。Stream#flatMap:
Stream.generate(() -> 42)
.findAny()
.ifPresent(System.out::println);
// completes "immediately" and prints 42
更重要的是,如果我们在短路(无法正常工作)的Stream#flatMap调用之后插入一些额外的处理,情况会变得更糟。
Stream.of(1)
.flatMap(i -> Stream.generate(() -> 42))
.map(i -> process(i))
.findAny()
.ifPresent(System.out::println);
private static <T> T process(T input) {
System.out.println("Processing...");
return input;
}
现在,我们不仅陷入无限运算循环,而且还正在处理所有通过的项目:
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
...
想象一下,如果process()方法包含一些阻塞操作和非必要的副作用(如电子邮件发送或日志记录),结果会怎样。
2. 说明
Stream#flatMap的内部实现是罪魁祸首,特别是下面的代码:
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
如你所见,内部流被Stream#forEach实时地计算(甚至没有提到条件语句周围缺少花括号.) 这个问题在JDK 9中仍然没有得到解决,但幸运的是,JDK 10附带了解决方案:
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstream);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
}
}
}
3. Stream#takeWhile/dropWhile
本小节与上面的有着直接关联,并且Stream#flatMap不再实时计算。假设我们有个lists中的list。
List<List<String>> list = List.of(
List.of("1", "2"),
List.of("3", "4", "5", "6", "7"));
我们想把它们压成一个整体:
list.stream()
.flatMap(Collection::stream)
.forEach(System.out::println);
// 1
// 2
// 3
// 4
// 5
// 6
// 7
它如我们所预期的那样工作。现在,让我们取压缩流,并且继续取元素,直到遇到“4”:
Stream.of("1", "2", "3", "4", "5", "6", "7")
.takeWhile(i -> !i.equals("4"))
.forEach(System.out::println);
// 1
// 2
// 3
它再次如我们所预期的那样工作。现在试着把二者结合起来,会出现什么问题吗?
List<List<String>> list = List.of(
List.of("1", "2"),
List.of("3", "4", "5", "6", "7"));
list.stream()
.flatMap(Collection::stream)
.takeWhile(i -> !i.equals("4"))
.forEach(System.out::println);
// 1
// 2
// 3
// 5
// 6
// 7
这是一个意想不到的转折,可以完全归因于Stream#flatMap的原始问题。前段时间,我在Twitter上进行了一次简短的民意调查,大多数人都对结果感到非常惊讶:
4. 自定义ForkJoinPool实例上的并行流
有一个众所周知的黑客攻击点(你不应该使用,因为它依赖于流API的内部实现细节)可以劫持并行流任务。并通过在你自己的FJP实例中运行定制的fork-join池中执行:
ForkJoinPool customPool = new ForkJoinPool(42);
customPool.submit(() -> list.parallelStream() /*...*/);
如果你认为自己已经成功地欺骗了所有人,那么你在一定程度上是对的。事实证明,即使任务在自定义池实例上运行,它们仍然耦合到共享池—计算的大小仍然与公共池(而不是自定义池成比例。——JDK-8190974
因此,即使你在不应该使用它们的时候使用了它们,JDK 10中也提供了解决方案。此外,如果确实需要使用Stream API来运行并行计算,可以使用并行收集器(parallel-collectors)代替。
原文链接:https://dzone.com/articles/java-stream-api-was-broken-before-jdk-10
作者: Grzegorz Piwowarek
译者:Yunooa
推荐:
上一篇:
关注公众号
点击原文阅读更多
以上是关于JDK10 之前仍有缺陷的Java Stream API的主要内容,如果未能解决你的问题,请参考以下文章