java 8 parallelStream() 和 sorted()

Posted

技术标签:

【中文标题】java 8 parallelStream() 和 sorted()【英文标题】:java 8 parallelStream() with sorted() 【发布时间】:2013-11-01 00:55:16 【问题描述】:

JDK 8 EA 现已推出,我只是想适应 lambda 和新的 Stream API。我尝试使用并行流对列表进行排序,但结果总是错误的:

import java.util.ArrayList;
import java.util.List;

public class Test

    public static void main(String[] args)
    
        List<String> list = new ArrayList<>();
        list.add("C");
        list.add("H");
        list.add("A");
        list.add("A");
        list.add("B");
        list.add("F");
        list.add("");

        list.parallelStream() // in parallel, not just concurrently!
            .filter(s -> !s.isEmpty()) // remove empty strings
            .distinct() // remove duplicates
            .sorted() // sort them
            .forEach(s -> System.out.println(s)); // print each item
    

输出:

C
F
B
H
A

请注意,每次输出都不同。我的问题是,这是一个错误吗?还是不能并行排序列表?如果是这样,那么为什么 JavaDoc 没有说明这一点?最后一个问题,是否还有另一个操作,其输出会因流类型而异?

【问题讨论】:

排序后删除重复可能会更好。 【参考方案1】:

您需要使用forEachOrdered,而不是forEach

根据forEach 文档:

对于并行流管道,此操作不能保证尊重流的遇到顺序,因为这样做会牺牲并行性的好处。对于任何给定的元素,可以在库选择的任何时间和任何线程中执行操作。如果操作访问共享状态,它负责提供所需的同步。

【讨论】:

我的猜测是它在内部创建了一个“排序”列表,每个线程都添加到该列表中,然后继续流程中的下一步(forEach),因此它无序执行,FWIW。 【参考方案2】:

此外,您可以通过here 中的一个非常好的示例阅读更多关于并行性和 forEachOrdered 的信息。总之,在并行流中使用 forEachOrdered 可能会导致失去并行性的好处。

这里是来自同一资源的示例:

Integer[] intArray = 1, 2, 3, 4, 5, 6, 7, 8 ;
List<Integer> listOfIntegers =
    new ArrayList<>(Arrays.asList(intArray));

System.out.println("listOfIntegers:");
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed(); 
Collections.sort(listOfIntegers, reversed);  
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Parallel stream");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Another parallel stream:");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("With forEachOrdered:");
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

输出是

listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1

第五个管道使用forEachOrdered方法,它处理 流的元素按其源指定的顺序, 无论您是串行还是并行执行流。 请注意,如果您使用,您可能会失去并行性的好处 像 forEachOrdered 这样的并行流操作

.

【讨论】:

这有点薄。请通过编辑扩展您的答案。

以上是关于java 8 parallelStream() 和 sorted()的主要内容,如果未能解决你的问题,请参考以下文章

Java 8 的 parallelStream 中产生了多少线程?

带有 spring 注释方法的 Java .parallelStream()

使用 parallelStream 时抛出 InterruptedException - Java [重复]

java并行之parallerlStream

深入浅出parallelStream

Java 8新特性之 并行和并行数组(八恶人-8)