JDK10 之前仍有缺陷的Java Stream API

Posted SpringForAll社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK10 之前仍有缺陷的Java Stream API相关的知识,希望对你有一定的参考价值。

Stream API 的bug会影响使用JDK8或JDK9的每个人,点击了解更多

当然,并非所有,但历史表明Stream API具有一些有趣的错误/缺陷,可能会影响仍然驻留在使用JDK 8和JDK 9上的每一个人。

1. Stream#flatMap

不幸的是,事实证明Stream#flatMap API并不像标榜的那样Lazy,以至允许存在几种抓狂的情况。举个例子,来看看这个:

 
   
   
 
  1. Stream.of(1)

  2. .flatMap(i -> Stream.generate(() -> 42))

  3. .findAny()

  4. .ifPresent(System.out::println);

在JDK 8和JDK 9中,上面的代码片段永远等待着内部无限流的计算。一个从无限序列中取出单个元素的简单操作,期望时间复杂度为O(1)——只要我们不处理内部的无限流,这就是它的工作方式。Stream#flatMap:

 
   
   
 
  1. Stream.generate(() -> 42)

  2. .findAny()

  3. .ifPresent(System.out::println);

  4. // completes "immediately" and prints 42

更重要的是,如果我们在短路(无法正常工作)的Stream#flatMap调用之后插入一些额外的处理,情况会变得更糟。

 
   
   
 
  1. Stream.of(1)

  2. .flatMap(i -> Stream.generate(() -> 42))

  3. .map(i -> process(i))

  4. .findAny()

  5. .ifPresent(System.out::println);

  6. private static <T> T process(T input) {

  7. System.out.println("Processing...");

  8. return input;

  9. }

现在,我们不仅陷入无限运算循环,而且还正在处理所有通过的项目:

 
   
   
 
  1. Processing...

  2. Processing...

  3. Processing...

  4. Processing...

  5. Processing...

  6. Processing...

  7. Processing...

  8. Processing...

  9. Processing...

  10. Processing...

  11. Processing...

  12. Processing...

  13. ...

想象一下,如果process()方法包含一些阻塞操作和非必要的副作用(如电子邮件发送或日志记录),结果会怎样。

2. 说明

Stream#flatMap的内部实现是罪魁祸首,特别是下面的代码:

 
   
   
 
  1. @Override

  2. public void accept(P_OUT u) {

  3. try (Stream<? extends R> result = mapper.apply(u)) {

  4. // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it

  5. if (result != null)

  6. result.sequential().forEach(downstream);

  7. }

  8. }

如你所见,内部流被Stream#forEach实时地计算(甚至没有提到条件语句周围缺少花括号.) 这个问题在JDK 9中仍然没有得到解决,但幸运的是,JDK 10附带了解决方案:

 
   
   
 
  1. @Override

  2. public void accept(P_OUT u) {

  3. try (Stream<? extends R> result = mapper.apply(u)) {

  4. if (result != null) {

  5. if (!cancellationRequestedCalled) {

  6. result.sequential().forEach(downstream);

  7. }

  8. else {

  9. var s = result.sequential().spliterator();

  10. do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));

  11. }

  12. }

  13. }

3. Stream#takeWhile/dropWhile

本小节与上面的有着直接关联,并且Stream#flatMap不再实时计算。假设我们有个lists中的list。

 
   
   
 
  1. List<List<String>> list = List.of(

  2. List.of("1", "2"),

  3. List.of("3", "4", "5", "6", "7"));

我们想把它们压成一个整体:

 
   
   
 
  1. list.stream()

  2. .flatMap(Collection::stream)

  3. .forEach(System.out::println);

  4. // 1

  5. // 2

  6. // 3

  7. // 4

  8. // 5

  9. // 6

  10. // 7

它如我们所预期的那样工作。现在,让我们取压缩流,并且继续取元素,直到遇到“4”:

 
   
   
 
  1. Stream.of("1", "2", "3", "4", "5", "6", "7")

  2. .takeWhile(i -> !i.equals("4"))

  3. .forEach(System.out::println);

  4. // 1

  5. // 2

  6. // 3

它再次如我们所预期的那样工作。现在试着把二者结合起来,会出现什么问题吗?

 
   
   
 
  1. List<List<String>> list = List.of(

  2. List.of("1", "2"),

  3. List.of("3", "4", "5", "6", "7"));

  4. list.stream()

  5. .flatMap(Collection::stream)

  6. .takeWhile(i -> !i.equals("4"))

  7. .forEach(System.out::println);

  8. // 1

  9. // 2

  10. // 3

  11. // 5

  12. // 6

  13. // 7

这是一个意想不到的转折,可以完全归因于Stream#flatMap的原始问题。前段时间,我在Twitter上进行了一次简短的民意调查,大多数人都对结果感到非常惊讶:

4. 自定义ForkJoinPool实例上的并行流

有一个众所周知的黑客攻击点(你不应该使用,因为它依赖于流API的内部实现细节)可以劫持并行流任务。并通过在你自己的FJP实例中运行定制的fork-join池中执行:

 
   
   
 
  1. ForkJoinPool customPool = new ForkJoinPool(42);

  2. customPool.submit(() -> list.parallelStream() /*...*/);

如果你认为自己已经成功地欺骗了所有人,那么你在一定程度上是对的。事实证明,即使任务在自定义池实例上运行,它们仍然耦合到共享池—计算的大小仍然与公共池(而不是自定义池成比例。——JDK-8190974

因此,即使你在不应该使用它们的时候使用了它们,JDK 10中也提供了解决方案。此外,如果确实需要使用Stream API来运行并行计算,可以使用并行收集器(parallel-collectors)代替。

原文链接:https://dzone.com/articles/java-stream-api-was-broken-before-jdk-10

作者: Grzegorz Piwowarek

译者:Yunooa


推荐: 

上一篇:




 关注公众号

点击原文阅读更多


以上是关于JDK10 之前仍有缺陷的Java Stream API的主要内容,如果未能解决你的问题,请参考以下文章

jdk1.8 java.util.stream

jdk1.8新特性Stream流式处理

java之stream(jdk8)

jdk1.8 java.util.stream.Stream类 详解

JDK 8 Stream 数据流效率怎么样?

试水jdk8 stream