在使用Java8并行流时的问题分析

Posted 灰太郎^_^

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在使用Java8并行流时的问题分析相关的知识,希望对你有一定的参考价值。

最近在使用Java8的并行流时遇到了坑,线上排查问题时花了较多时间,分享出来与大家一起学习与自查

// 此处为坑
List<Java8Demo> copy = Lists.newArrayList();
numbers.parallelStream().forEach(item -> {
    copy.add(new Java8Demo(item));
});

上图用到了parallelStrem并行流,在循环内部往共享变量copy内写值,由于ArrayList本身不具备线程安全性,导致得到的copy内容有缺失。

总结经验如下:

  1. 在并行流内部不能对外部共享变量做写操作
  2. 如有需要,使用收集器实现上述并行流,收集器在内部即使使用ArrayList,也不会造成问题!

提供两种解决方案:

  • 串行
    // stream串行
    List<Java8Demo> copy = Lists.newArrayList();
    numbers.stream().forEach(item -> {
        copy.add(new Java8Demo(item));
    });
  • 收集器
    // 并行使用收集器
    List<Java8Demo> copy = numbers.parallelStream().map(Java8Demo::new).collect(Collectors.toList());

可运行Demo.java

package acc.biz.impl;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

public class Demo {

    private Integer value;

    public Demo(Integer value) {
        this.value = value;
    }

    public static List<Integer> numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

    public static void main(String[] args) {
        /** parallelStream并行 */
        int count1 = 1;
        while (count1 < 100) {
            // 此处为坑
            List<Demo> copy = Lists.newArrayList();
            numbers.parallelStream().forEach(item -> {
                copy.add(new Demo(item));
            });

            // 打印错误
            if (copy.size() != numbers.size()) {
                System.out.println(
                        new StringBuilder().append("parallelStream循环第").append(count1).append("次报错,numbers.size: [")
                                .append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]"));
                break;
            }

            count1++;
        }

        /** stream串行 */
        int count2 = 1;
        while (count2 < 100) {
            // stream串行
            List<Demo> copy = Lists.newArrayList();
            numbers.stream().forEach(item -> {
                copy.add(new Demo(item));
            });

            // 打印错误
            if (copy.size() != numbers.size()) {
                System.out.println(new StringBuilder().append("stream循环第").append(count2).append("次报错,numbers.size: [")
                        .append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]"));
                break;
            }

            count2++;
        }

        /** Collectors并行 */
        int count3 = 1;
        while (count3 < 100) {
            // 并行使用收集器
            List<Demo> copy = numbers.parallelStream().map(Demo::new).collect(Collectors.toList());

            // 打印错误
            if (copy.size() != numbers.size()) {
                System.out.println(
                        new StringBuilder().append("Collectors循环第").append(count3).append("次报错,numbers.size: [")
                                .append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]"));
                break;
            }

            count3++;
        }
    }

    public Integer getValue() {
        return value;
    }

    public void setValue(Integer value) {
        this.value = value;
    }
}

 

以上是关于在使用Java8并行流时的问题分析的主要内容,如果未能解决你的问题,请参考以下文章

使用 CUDA 流时的段错误

如何在 python 中并行化以下代码片段?

Java8实战使用并行流

并行使用 scala Spark 重命名 HDFS 文件时的序列化问题

java 8流和并行流之间的区别

C++ OpenMP 和 gcc 4.8.1 - 并行化循环时的性能问题