Java并行流-调用parallel()方法的顺序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并行流-调用parallel()方法的顺序相关的知识,希望对你有一定的参考价值。

AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

[我写这篇文章时,我假设线程将仅在map调用中产生,因为在map之后放置了parallel。但是文件中的某些行每次执行都会获得不同的记录号。

我阅读了官方的Java stream documentation和一些网站,以了解流在幕后的工作方式。

几个问题:

  • Java并行流基于SplitIterator工作,它由ArrayList,LinkedList等每个集合实现。当我们从这些集合构造并行流时,将使用相应的拆分迭代器来拆分和迭代该集合。这解释了为什么并行性发生在原始输入源(文件行)级别而不是映射结果(即Record pojo)。我的理解正确吗?
  • 就我而言,输入是文件IO流。将使用哪个吐痰迭代器?
  • 我们在管道中放置parallel()的位置都没关系。原始输入源将始终被分割,其余中间操作将被应用。

    在这种情况下,Java不应允许用户在管道中的任何地方(除了原始来源之外)进行并行操作。因为,它为那些不知道java流内部工作方式的人提供了错误的理解。我知道parallel()操作将已经为Stream对象类型定义,因此,它是以这种方式工作的。但是,最好提供一些替代解决方案。

  • 在上面的代码片段中,我试图为输入文件中的每个记录添加一个行号,因此应该对其进行排序。但是,我想并行应用doSomeOperation(),因为它很重。一种实现方式是编写自己的自定义拆分迭代器。还有其他办法吗?
答案

这说明了为什么并行性发生在原始输入源(文件行)级别而不是映射结果(即Record pojo)。

整个流是并行的或顺序的。我们没有选择要顺序或并行运行的操作子集。

当启动终端操作时,根据调用流的方向,顺序或并行执行流管道。当终端操作启动时,流管线根据被调用的流的模式顺序或并行执行。 source

正如您提到的,并行流使用拆分迭代器。显然,这是在操作开始运行之前对数据进行分区。


就我而言,输入是文件IO流。将使用哪个吐痰迭代器?

查看源代码,我看到它使用了java.nio.file.FileChannelLinesSpliterator


我们在管道中的哪里放置parallel()都没有关系。原始输入源将始终被分割,其余中间操作将被应用。

对。您甚至可以多次调用parallel()sequential()。最后调用的那个将获胜。调用parallel()时,将其设置为返回的流。并且如上所述,所有操作依次或并行运行。


在这种情况下,Java不应允许用户在管道中的任何位置(除了原始源之外都进行并行操作...

这成为意见问题。我认为Zabuza有充分的理由支持JDK设计师的选择。


一种实现方法是编写自己的自定义拆分迭代器。还有其他办法吗?

这取决于您的操作

  • 如果findFirst()是您真正的终端操作,那么您甚至不必担心并行执行,因为无论如何,doSomething()的调用不会太多(findFirst()处于短路状态)。实际上,.parallel()可能会导致处理多个元素,而顺序流上的findFirst()可能会阻止该元素。
  • 如果终端操作不会创建太多数据,则可以使用顺序流创建Record对象,然后并行处理结果:

    List<Record> smallData = Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
    
  • 如果您的管道会在内存中加载大量数据(这可能是您使用Files.lines()的原因),那么可能需要自定义拆分迭代器。不过,在我去那里之前,我会研究其他选项(例如,以id列开头的行保存-这只是我的看法)。我也将尝试以较小的批次处理记录,例如:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = 
            Files.newBufferedReader(inputFile.toPath(), StandardCharsets.UTF_8)).toPath(), 
                        StandardCharsets.UTF_8);)
        Supplier<List<Record>> batchSupplier = () -> 
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) 
                String nextLine;
                try 
                    nextLine = reader.readLine();
                 catch (IOException e) 
                    //hanlde exception
                    throw new RuntimeException(e);
                
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            
            System.out.println("next batch");
    
            return batch;
        ;
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    
    

    这将并行执行doSomeOperation(),而不将所有数据加载到内存中。但是请注意,需要考虑batchSize

以上是关于Java并行流-调用parallel()方法的顺序的主要内容,如果未能解决你的问题,请参考以下文章

《Java8实战》 - 读书笔记 - Parallel Stream并行流知识

java8新特性——并行流与顺序流

Stream多线程并行数据处理

Java8-04-05-笔记

Java8-04-05-笔记

java8新特性:Stream多线程并行数据处理