为啥具有短路操作的并行 Java Stream 会评估 Stream 的所有元素,而顺序 Stream 不会?

Posted

技术标签:

【中文标题】为啥具有短路操作的并行 Java Stream 会评估 Stream 的所有元素,而顺序 Stream 不会?【英文标题】:Why does a parallel Java Stream with a short-curcuit operation evaluate all elements of the Stream while a sequential Stream does not?为什么具有短路操作的并行 Java Stream 会评估 Stream 的所有元素,而顺序 Stream 不会? 【发布时间】:2018-03-22 06:47:47 【问题描述】:

考虑一下parallel()和sequential()这两种测试方法:

  @Test
  public void parallel() throws Exception
  
    System.out.println( "parallel start." );
    IntStream.of( 0, 1 ).parallel().map( this::work ).findAny();
    System.out.println( "parallel done." );
  

  @Test
  public void sequential() throws Exception
  
    System.out.println( "sequential start." );
    IntStream.of( 0, 1 ).map( this::work ).findAny();
    System.out.println( "sequential done." );
  

  private int work(int i)
  
    System.out.println( "working... " + i );
    Threads.sleepSafe( i * 1000 );
    System.out.println( "worked. " + i );
    return i;
  

Threads.sleepSafe() 是一个简单的 Thread.sleep() 包装器,它吞下异常并且如果传递了 0,则什么也不做。

运行测试方法时,结果如下:

sequential start.
working... 0
worked. 0
sequential done.

parallel start.
working... 1
working... 0
worked. 0
sleeping for 1000 ms ...
slept for 1000 ms.
worked. 1
parallel done.

sequential() 按我的预期运行,但 parallel() 没有: 我希望parallel() 中的findAny()work() 第一次返回时立即返回(即值0,因为它不休眠),但它只在work() 完成后返回值1.

为什么?

有没有办法让findAny()work() 第一次返回时立即返回?

【问题讨论】:

【参考方案1】:

如果你想要一个并行流,那么是的,它会同时调用work方法多次。

请注意,如果您的并行流有 1,000 个元素并使用 5 个线程,则并行流将最多调用 work 5 次,而不是 1,000 次。

如果您只想调用一次work,请使用顺序流。

【讨论】:

我不希望 work() 只被调用一次 - 我希望 findAny() 在第一个 work() 返回后立即返回。 @sys64738 听起来您希望并行流在找到匹配元素后立即中断工作线程的计算。但是流不知道您在做什么。所以你可以有一个全局变量,你可以使用它来检查是否找到了某些东西,并在必要时手动从work 方法返回。但这是你需要写的东西。【参考方案2】:

并行模式下的 Stream API 基于 ForkJoinPool 范例,默认使用最大 X 个线程(其中 X 等于可用处理器的数量) .如果要增加迭代次数,可以检查此规则。

通常,可以通过两种方式自定义并行流的默认线程池计数:

提交并行流执行到自己的ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(soSomething)); 使用系统属性更改公共池的大小:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") 用于 20 个线程的目标并行度。

有没有办法让 findAny() 在 work() 第一次返回后立即返回?

根据 ForkJoin 算法的思路,基本上答案是。它“等待”,而所有线程都将完成它们的工作。但如前所述,您可以将工作人员的数量限制为单个工作人员。显然它没有任何场景,因为这种方法类似于顺序执行,但会通过冗余操作增加额外开销。

【讨论】:

也许我不明白你的答案,但从输出中可以看出,work() 方法是同时调用的,即有足够的线程 @sys64738 更新了更详细的答案【参考方案3】:

并行流仍然支持短路,但如果所有线程都推迟工作,直到处理先前元素的线程确认操作尚未结束,则使用并行流没有任何优势。

因此,只要最终结果正确组装,即丢弃多余的元素,并行流处理未指定数量的元素是预期的行为。

这只是您的示例,仅包含两个元素,仅处理一个超出必要的元素可以解释为“所有元素都已处理”。

当元素的数量很少和/或实际操作是在流的第一个元素中找到可以预测的东西时,并行处理通常没有什么好处。如果你做类似的事情,事情会变得更有趣

IntStream.range(0, 2000).parallel()
    .map(i ->  LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); return i;)
    .filter(i->i%397==396)
    .findAny();

请注意,终端操作将等待所有工作线程完成后才返回最终结果,因此当一个元素的评估在找到结果时已经开始,该元素的处理将完成。这是设计使然。它确保当您的应用程序代码在流操作之后继续进行时,不会同时访问源集合或您的 lambda 表达式访问的其他数据。

与the package documentation比较:

在几乎所有情况下,终端操作渴望,在返回之前完成它们对数据源的遍历和管道的处理。只有终端操作iterator()spliterator()不是; …

因此,短路并行流不会处理所有元素,但当其他工作线程仍在处理过时元素时,返回已评估结果可能仍需要更长的时间。

如果您希望提前返回,接受可能仍在运行的后台线程,Stream API 不适合您。考虑

private int work(int i) throws InterruptedException 
    System.out.println( "working... " + i );
    Thread.sleep(i * 1000);
    System.out.println( "worked. " + i );
    return i;

public void parallel() throws Exception 
    System.out.println( "parallel start." );
    List<Callable<Integer>> jobs = IntStream.range(0, 100)
      .collect(ArrayList::new, (l,i) -> l.add(() -> work(i)), List::addAll);
    ExecutorService pool = Executors.newFixedThreadPool(10);
    Integer result = pool.invokeAny(jobs);
    pool.shutdown();
    System.out.println( "parallel done, result="+result );

请注意,这不仅在第一个作业完成后立即返回,它还支持通过中断取消所有已经运行的作业。

【讨论】:

我只是在示例中使用了两个线程以使其尽可能清晰。添加更多元素不会改变行为。 您使用的不是两个线程,而是两个元素。您无法控制实际涉及多少线程。但是只要实现使用的线程多于元素,那么并发调用的数量至少会与元素一样多。这不会随着更多元素而改变。我从没有说过。我只是说不是所有的元素都被处理了。当您的元素数量显着增加时,您将能够认识到这一点。 你写的“终端操作将等待所有工作线程完成,然后返回最终结果”。这就是我正在经历的,但我没有在 API 文档中找到它。你能指点我相应的文档吗? 好问题。我总是把它当作给定的。我什至还记得一个(现已修复)关于在发生异常时不等待的错误报告(受影响的早期 Java 8 版本)。经过搜索,我发现this:“几乎所有情况下,终端操作都很急切,在返回之前完成它们对数据源的遍历和管道的处理。只有终端操作iterator()spliterator()不是; …

以上是关于为啥具有短路操作的并行 Java Stream 会评估 Stream 的所有元素,而顺序 Stream 不会?的主要内容,如果未能解决你的问题,请参考以下文章

Java8 新特性 Stream 非短路终端操作

Java8 新特性 Stream 短路终端操作

Stream并行流详解

java8-流式编程Stream

为啥在执行 Java Stream 终端操作时对象没有被垃圾收集?

java Stream流随笔