Java Stream 减少无法解释的行为

Posted

技术标签:

【中文标题】Java Stream 减少无法解释的行为【英文标题】:Java Stream reduce unexplained behaviour 【发布时间】:2020-01-20 05:13:41 【问题描述】:

谁能指出我正确的方向,因为我无法理解这个问题。

我正在执行以下方法。

private static void reduce_parallelStream() 
    List<String> vals = Arrays.asList("a", "b");

    List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
            (List<String> l, String v) -> 

                l.add(v);

                return l;
            , (a, b) ->                    
                a.addAll(b);
                return a;
            

    );

   System.out.println(join);


打印出来

[null, a, null, a]

我不明白为什么它会在结果列表中放置两个 null。我希望答案是

[a, b]

因为是并行流所以第一个参数要reduce

新的 ArrayList()

可能会为每个输入值 a 和 b 调用两次。

那么累加器函数可能会被调用两次,因为它是一个并行流,并在每次调用中传递每个输入“a 和 b”以及种子值提供的列表。因此 a 被添加到列表 1 中,b 被添加到列表 2 中(反之亦然)。之后组合器将合并两个列表,但它不会发生。

有趣的是,如果我在累加器中放置一个打印语句来打印输入的值,那么输出会发生变化。所以关注

private static void reduce_parallelStream() 
    List<String> vals = Arrays.asList("a", "b");

    List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
            (List<String> l, String v) -> 
                System.out.printf("l is %s", l);
                l.add(v);
                System.out.printf("l is %s", l);
                return l;
            , (a, b) -> 
                a.addAll(b);
                return a;
            

    );

   System.out.println(join);


这个输出的结果

l 是 []l 是 [b]l 是 [b, a]l 是 [b, a][b, a, b, a]

谁能解释一下。

【问题讨论】:

ArrayList 不是线程安全的集合,因此任何并发调用都可能导致意外行为。另一方面,在使用 System.out.println 时,有一个隐藏的同步机制。当您使用synchronized list 时会发生什么? coderanch上有一个有趣的解释 就像Why does Java stream map reduce count my result twice? 一样,可能还有其他一些错误使用reduce 的例子。阅读the package documentation(两个部分,“归约操作”和“可变归约”)也可能有所帮助。 【参考方案1】:

在使用parallelStream() 时,您应该使用Collections.synchronizedList()。因为ArrayList 不是线程安全的,并且在并发访问它时会出现意外行为,就像使用parallelStream() 一样。

我已经修改了你的代码,现在它可以正常工作了:

private static void reduce_parallelStream() 
    List<String> vals = Arrays.asList("a", "b");

    // Use Synchronized List when with parallelStream()
    List<String> join = vals.parallelStream().reduce(Collections.synchronizedList(new ArrayList<>()),
            (l, v) -> 
                l.add(v);
                return l;
            , (a, b) -> a // don't use addAll() here to multiplicate the output like [a, b, a, b]
    );
    System.out.println(join);

输出:

有时你会得到这个输出:

[a, b]

有时还有这个:

[b, a]

原因是它是parallelStream(),所以您无法确定执行顺序。

【讨论】:

【参考方案2】:

因为是并行流所以第一个参数要减少new ArrayList() 对于每个输入值 a 和 b,可能会被调用两次。

那就是你错了。第一个参数是单个ArrayList 实例,不是 lambda 表达式可以产生多个ArrayList 实例。

因此,整个归约操作在单个 ArrayList 实例上进行。当多个线程并行修改 ArrayList 时,每次执行的结果可能会发生变化。

您的combiner 实际上将List 的所有元素添加到同一个List

如果accumulatorcombiner 函数都将产生一个新的ArrayList 而不是改变它们的输入ArrayList,则您可以获得预期的[a,b] 输出:

List<String> join = vals.parallelStream().reduce(
     new ArrayList<String>(),
        (List<String> l, String v) -> 
            List<String> cl = new ArrayList<>(l);
            cl.add(v);
            return cl;
        , (a, b) -> 
            List<String> ca = new ArrayList<>(a);
            ca.addAll(b);
            return ca;
        
);

也就是说,您根本不应该使用reducecollect 是执行可变归约的正确方法:

List<String> join = vals.parallelStream()
                        .collect(ArrayList::new,ArrayList::add,ArrayList::addAll);

如您所见,这里与reduce 不同,您传递的第一个参数是Supplier&lt;ArrayList&lt;String&gt;&gt;,可用于根据需要生成尽可能多的中间ArrayList 实例。

【讨论】:

...和.collect(ArrayList::new,ArrayList::add,ArrayList::addAll).collect(Collectors.toCollection(ArrayList::new)) 的重新实现,分别。 .collect(Collectors.toList()) 如果我们不关心返回列表的实际类型。【参考方案3】:

这很简单,第一个参数是 identity 或者我会说 zero 开始。对于parallelStream usage,此值重用。这意味着并发问题(添加中的 null)和重复。

这可以通过以下方式修补:

    final ArrayList<String> zero = new ArrayList<>();
    List<String> join = vals.parallelStream().reduce(zero,
            (List<String> l, String v) -> 
                if (l == zero) 
                    l = new ArrayList<>();
                
                l.add(v);
                return l;
            , (a, b) -> 
                // See comment of Holger:
                if (a == zero) return b;
                if (b == zero) return a;

                a.addAll(b);
                return a;
            
    );

安全。

您可能想知道为什么reduce 对提供身份的函数没有重载。 原因是这里应该使用collect

【讨论】:

无法保证组合器不会获得标识值。所以你也必须在那里执行if (a == zero) a = new ArrayList&lt;&gt;();。最好尽早发现错误,因此最好使用final List&lt;String&gt; zero = Collections.emptyList();。也许,还有更多目前看不到的问题。一般来说,不要违背其意图使用 API…… @Holger 我怀疑所有的组合器调用可以不止一次地获得身份,而且我之前没有滥用身份。我的最后一句话完全同意你的看法;我没有像 Eran 那样给出一个收集解决方案。你是对的零:至少应该是不可修改的。 试试IntStream.range(0, 0x10001).filter(i -&gt; Integer.highestOneBit(i)==i) .parallel().mapToObj(String::valueOf)

以上是关于Java Stream 减少无法解释的行为的主要内容,如果未能解决你的问题,请参考以下文章

Java8 Stream

Java8之Stream/Map

java流stream的一些简单用法

Java8中聚合操作collectreduce方法详解

cmd运行java文件不显示——Stream代码简洁之道的详细用法

Java8新特性——Stream API的创建方式及中间操作