将数组的并行流减少为单个数组

Posted

技术标签:

【中文标题】将数组的并行流减少为单个数组【英文标题】:reduce a parallel stream of arrays into a single array 【发布时间】:2021-01-01 12:54:42 【问题描述】:

我正在尝试将数组 Stream 的并行流减少为单个数组 ArrayList 所以 我使用带有累加器和组合器的reduce方法如下:-

    public static void main(String [] args) 
        ArrayList<String> l1 = new ArrayList<>();
        l1.add("a1");
        l1.add("a2");
        
        List<String> l2 = new ArrayList<>();
        l2.add("a3");
        l2.add("a4");
                
        List<List<String>> l = new ArrayList<>();
        l.add(l1);
        l.add(l2);
        
        Stream<List<String>> stream = l.stream();
        join(stream).forEach(System.out::println);


private  static  <T> List<T> join(Stream<List<T>> stream) 
        return stream.parallel().reduce(new ArrayList<>(),  (total, element) -> 
            System.out.println("total: " + total);
            System.out.println("element: " + element);
            total.addAll(element);
            return total;
        ,  (total1, total2) -> 
            System.out.println("total1: " + total1);
            System.out.println("total2: " + total2);
            total1.addAll(total2);
            return total1;
        );

我知道组合器用于组合并行流..但它没有按我预期的那样工作, 因为我得到了如下重复的结果:-

total: []
element: [a3, a4]
total: []
element: [a1, a2]
total1: [a3, a4, a1, a2]
total2: [a3, a4, a1, a2]
a3
a4
a1
a2
a3
a4
a1
a2

那么为什么结果是重复的?在累加器中使用数组列表也是线程安全的吗?

【问题讨论】:

【参考方案1】:

你应该只使用flatMap:

返回一个流,该流包含将此流的每个元素替换为通过将提供的映射函数应用于每个元素而生成的映射流的内容的结果。每个映射流在其内容被放入该流后关闭。 (如果映射流为空,则使用空流。)

这是一个中间操作。

l.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); // is [a1, a2, a3, a4]

l.stream().flatMap(List::stream).collect(Collectors.toList());

您的代码的问题在于您将函数式代码与副作用混合在一起。这可不是好兆头。如果去掉副作用,输出如预期:

    private static <T> List<T> join(Stream<List<T>> stream) 
        return stream.parallel().reduce(new ArrayList<>(), (total, element) -> 
            System.out.println("total: " + total);
            System.out.println("element: " + element);
            //total.addAll(element);
            //return total;
            var list = new ArrayList<T>(total);
            list.addAll(element);
            return list;
        , (total1, total2) -> 
            System.out.println("total1: " + total1);
            System.out.println("total2: " + total2);
            //total1.addAll(total2);
            //return total1;
            var list = new ArrayList<T>(total1);
            list.addAll(total2);
            return list;
        );
    

您还应该避免使用parallel(),除非您有明确、客观的理由。并行性是一种开销,只有在有繁重的工作要做时它才会变得更加高效。否则,同步开销将是比任何收益更大的损失。

【讨论】:

为什么在并行处理的情况下累加器和合并器应该返回一个新对象? @Abdelrahman_Attya 它应该没有副作用。 使用reduce时,这些防御性副本是不可避免的。相比之下,使用collect(ArrayList::new, List::addAll, List::addAll) 不仅更短,而且效率更高。 总结一下:reduce 旨在执行不可变归约..当我们想要将 (int, double, ..) 等不可变对象流归约为单个值时使用它..但是如果我们想减少一个可变对象,例如 (list),我们必须通过每一步返回一个新对象来使其成为线程安全的。

以上是关于将数组的并行流减少为单个数组的主要内容,如果未能解决你的问题,请参考以下文章

减少cython并行中的数组

java字符流

将两个列数组合并为配置单元中的 1 个数组列

如何将对象数组转换为在打字稿中具有动态键的单个对象

通过键减少转换单个数组

Java 8新特性之 并行和并行数组(八恶人-8)