Java8 Stream流

Posted Lam

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java8 Stream流相关的知识,希望对你有一定的参考价值。

1. 概述

在 Java8 之前,我们通常是通过 for 循环或者 Iterator 迭代来重新排序合并数据 ,又或者通过 重新定义 Collections.sorts 的 Comparator 方法 来实现,这两种方式对于大数据量系统来说,效率并不是很理想。

Java8 中添加了一个新的接口类 Stream,他和我们之前接触的字节流概念不太一样,Java8 集合中的 Stream 相当于高级版的 Iterator他通过 Lambda 表达式对集合进行各种非常便利、高效的聚合操作(Aggregate Operation),或者大批量数据操作 (Bulk Data Operation) 

1-1. Stream理解

在 Java 中我们称 Stream 为 流 ,我们经常会用流去对集合进行一些流水线的操作。Stream 就像工厂一样,只需要把集合、命令还有一些参数灌输到流水线中去,就可以加工成得出想要的结果。这样的流水线能大大简洁代码,减少操作。

1-2. Stream流程

原集合 —> 流  —> 各种操作(过滤、分组、统计) —> 终端操作

Stream 流的操作流程一般都是这样的,先将集合转为流,然后经过各种操作,比如过滤、筛选、分组、计算。最后的终端操作,就是转化成我们想要的数据,这个数据的形式一般还是集合,有时也会按照需求输出 count 计数。

1-3. Stream流简单演示

public class Student {

    private String name; // 姓名
    private String sex; // 性别
    private int hight; // 身高

    public Student(String name, String age, int hight) {
        this.name = name;
        this.sex = age;
        this.hight = hight;
    }

    // Get And Set
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public int getHight() {
        return hight;
    }

    public void setHight(int hight) {
        this.hight = hight;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name=\'" + name + \'\\\'\' +
                ", sex=\'" + sex + \'\\\'\' +
                ", hight=" + hight +
                \'}\';
    }
}
Student.java

1) 使用传统的迭代方式来实现

public class Test {
    public static void main(String[] args) {

        List<Student> list = new ArrayList();
        list.add(new Student("张三", "男", 168));
        list.add(new Student("刘翏柳", "女", 168));
        list.add(new Student("张丰", "男", 181));
        list.add(new Student("李三炮", "男", 178));
        list.add(new Student("李狗蛋", "男", 159));
        list.add(new Student("张倩倩", "女", 162));
     
        List<Student> newList = new ArrayList<>();
        for (Student stu : list) {
            // 过滤: 姓名长度 >=3 并且 身高 >160
            if (stu.getName().length() >= 3 && stu.getHight() > 160) {
                // 排序 - 矮的排前面
                if (newList.size() > 0 && stu.getHight() < newList.get(0).getHight())
                    newList.add(0, stu);  // 队头
                else
                    newList.add(stu); // 队尾
            }
        }
        // 遍历新的集合 输出
        for (Student student : newList) {
            System.out.println(student);
        }
    }
}

2)Java8 中的 Stream API 进行实现

public class Test {
    public static void main(String[] args) {

        List<Student> list = new ArrayList();
        list.add(new Student("张三", "男", 168));
        list.add(new Student("刘翏柳", "女", 168));
        list.add(new Student("张丰", "男", 181));
        list.add(new Student("李三炮", "男", 178));
        list.add(new Student("李狗蛋", "男", 159));
        list.add(new Student("张倩倩", "女", 162));

        // lambda表达式、链式编程、函数式接口、Stream流式计算
        list.stream()
                .filter(stu -> stu.getName().length() >= 3)  // 姓名长度 >=3
                .filter(stu -> stu.getHight() > 160) // 身高 >160
                .sorted((stu_1, stu_2) -> Integer.compare(stu_1.getHight(), stu_2.getHight())) // // 排序 - 矮的排前面
                .forEach(System.out::println); // 遍历输出
    }
}

- 输出结果:

Student{name=\'张倩倩\', sex=\'女\', hight=162}

Student{name=\'刘翏柳\', sex=\'女\', hight=168}

Student{name=\'李三炮\', sex=\'男\', hight=178}

2. Stream流创建

想使用 Stream 流,首先咱得先创建一个 Stream 流对象。创建 Steam 需要数据源.这些数据源可以是集合、可以是数组、可以使文件、甚至是你可以去自定义等等。

2-1. 集合作为Stream数据源

集合 Collection 作为 Stream 的数据源,应该也是我们用的最多的一种数据源了。Collection 里面也提供了一些方法帮助我们把集合 Collection 转换成 Stream 。

① stream()

调用 Collection.stream() 函数创建一个 Stream 对象。相当于把集合 Collection 里面的数据都导入到了 Stream 里面去了。

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 使用List创建一个流对象 Stream<Integer> stream = list.stream(); // TODO: 对流对象做处理

② parallelStream()

调用 Collection.parallelStream() 创建 Stream 对象。

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 使用List创建一个流对象 Stream<Integer> stream = list.parallelStream(); // TODO: 对流对象做处理

● parallelStream() 与 stream() 的区别

parallelStream() 使用多线程并发处理最后子啊汇总结果,而 stream() 是单线程。所以相对来说 parallelStream() 效率要稍微高点。

2-2. 数组作为Stream数据源

数组也可以作为 Stream 的数据源。我们可以通过 Arrays.stream() 方法把一个数组转化成流对象。Arrays.stream() 方法很丰富,有很多个。大家可以根据实际情况使用。

int[] intArray = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// 使用数组创建一个流对象
IntStream stream = Arrays.stream(intArray);
// TODO: 对流对象做处理 

2-3. BufferedReader作为Stream数据源

我们也可以把 BufferedReader 里面 lines 方法把 BufferedReader 里面每一行的数据作为数据源生成一个 Stream 对象。

File file = new File("/home/src/main/resources/application.yml");
try {
    // 把文件里面的内容一行一行的读出来
    BufferedReader in = new BufferedReader(new FileReader(file));
    // 生成一个Stream对象
    Stream<String> stream = in.lines();
    // TODO: 对流对象做处理
} catch (IOException e) {
    e.printStackTrace();
}

2-4. File作为Stream数据源

Files 里面多个生成 Stream 对象的方法,都是对 Path(文件) 的操作。

有的是指定 Path 目录下所有的子文件(所有的子文件相当于是一个列表了)作为 Stream 数据源,有的把指定 Path 文件里面的每一行数据作为 Stream 的数据源。

① Files.list()

列出指定 Path 下面的所有文件。把这些文件作为 Stream 数据源。

Path path = Paths.get("D:\\\\home\\\\src\\\\main\\\\resources");
try {
    // 找到指定path下的所有的文件
    Stream<Path> stream = Files.list(path);
    // TODO: 对流对象做处理
} catch (IOException e) {
    e.printStackTrace();
}

② Files.walk()

Files.walk() 方法用于遍历子文件(包括文件夹)。参数 maxDepth 用于指定遍历的深度。把子文件(子文件夹)作为 Stream 数据源。

Path path = Paths.get("D:\\\\home\\\\src\\\\main\\\\resources");
try {
    // 第二个参数用于指定遍历几层
    Stream<Path> stream = Files.walk(path, 2);
    // TODO: 对流对象做处理
} catch (IOException e) {
    e.printStackTrace();
}

③ Files.find()

Files.find 方法用于遍历查找(过滤)子文件。参数里面会指定查询(过滤)条件。把过滤出来的子文件作为 Stream 的数据源。

Path path = Paths.get("D:\\\\home\\\\src\\\\main\\\\resources");
try {
    // 找到指定path下的所有不是目录的文件
    Stream<Path> stream = Files.find(path, 2, (path1, basicFileAttributes) -> {
        // 过滤掉目录文件
        return !basicFileAttributes.isDirectory();
    });
    // TODO: 对流对象做处理
} catch (IOException e) {
    e.printStackTrace();
}

④ Files.lines()

Files.lines 方法是把指定 Path 文件里面的每一行内容作为 Stream 的数据源。

Path path = Paths.get("D:\\\\home\\\\src\\\\main\\\\resources");
try {
    // 找到指定path下的所有不是目录的文件
    Stream<Path> stream = Files.find(path, 2, (path1, basicFileAttributes) -> {
        // 过滤掉目录文件
        return !basicFileAttributes.isDirectory();
    });
    // TODO: 对流对象做处理
} catch (IOException e) {
    e.printStackTrace();
}

2-5. 自己构建Stream

我们也可以自己去创建 Stream 自己提供数据源。Stream 类里面提供 of()、iterate()、generate()、builder() 等一些方法来创建 Stream,Stream 的数据源我们自己提供。

① Stream.of()

Stream.of() 函数参数就是数据源。

Stream<Integer> ofSteam = Stream.of(1,2,3,4,5,6);

② Stream.iterate()

Stream.iterate() 可以用来生成无限流,函数需要两个参数:第一个参数是初始值、第二个参数用于确定怎么根据前一个元素的值生成下一个元素。

// Stream.iterate() 流式迭代器
Stream<BigInteger> integers = Stream.iterate(
        BigInteger.ONE, // 初始值
        bigInteger -> bigInteger.add(BigInteger.ONE) // 用于确定怎么根据前一个元素的值生成下一个元素
);
// 简单输出
integers.limit(10).forEach(System.out::println);

③ Stream.generate()

Stream.generate() 也是用于生成一个无限流。参数用于获取每个元素。

// Stream.generate() 生成无限流
Stream<Double> generateA = Stream.generate(() -> {
    return java.lang.Math.random() * 100;
});
// 简单输出前10个值
generateA.limit(10).forEach(randomInt -> System.out.println(randomInt));

④ Stream.build()

Stream.build() 通过建造者模式生成一个 Stream 建造器。然后把需要加入 Stream 里面的数据源一个一个通过建造器添加进去。

// Stream.builder() 构造一个 Stream 对象
Stream.Builder<Integer> build = Stream.<Integer>builder()
        .add(1)
        .add(2)
        .add(3);
build.accept(4);
build.accept(5);
// 遍历输出
build.build().forEach(integer -> System.out.println(integer));
// TODO: 对流对象做处理

2-6. 其他Stream创建方式

Stream 其他创建方式我们就不一一举例了。有如下方式。

  • Random.ints()
  • BitSet.stream()
  • Pattern.splitAsStream(java.lang.CharSequence)
  • JarFile.stream()
  • ...

3. Stream流操作 与 终端操作

3-1. Stream流操作(操作符)

Stream 流操作就是对 Stream 流的各种处理。Stream 里面已经给提供了很多中间操作(我们一般称之为操作符)。

Stream 提供的流操作符。

Stream流操作符解释
filter 对流里面的数据做过滤操作
map 对流里面每个元素做转换
mapToInt 把流里面的每个元素转换成int
mapToLong 流里面每个元素转换成long
mapToDouble 流里面每个元素转换成double
flatMap 流里面每个元素转换成Steam对象,最后平铺成一个Stream对象
flatMapToInt 流里面每个元素转换成IntStream对象,最后平铺成一个IntStream对象
flatMapToLong 流里面每个元素转换成LongStream对象,最后平铺成一个LongStream对象
flatMapToDouble 流里面每个元素转换成DoubleStream对象,最后平铺成一个DoubleStream对象
distinct 去重
sorted 对流里面的元素排序
peek 查看流里面的每个元素
limit 返回前n个数
skip 跳过前n个元素

Stream 提供了这么多的操作符,而且这些操作符是可以组合起来使用。

3-2. Stream流终端操作

Stream 流终端操作是流式处理的最后一步,之前已经对 Stream 做了一系列的处理之后。该拿出结果了。我们可以在终端操作中实现对流的遍历、查找、归约、收集等等一系列的操作。

Stream 流终端操作提供的函数有。

终端操作符解释
forEach 遍历
forEachOrdered 如果流里面的元素是有顺序的则按顺序遍历
toArray 转换成数组
reduce 归约 - 根据一定的规则将Stream中的元素进行计算后返回一个唯一的值
collect 收集 - 对处理结果的封装
min 最小值
max 最大值
count 元素的个数
anyMatch 任何一个匹配到了就返回true
allMatch 所有都匹配上了就返回true
noneMatch 没有一个匹配上就返回true
findFirst 返回满足条件的第一个元素
findAny 返回某个元素

关于 Stream 终端操作部分,我们就着重讲下 collect() 函数的使用。因为其他的终端操作符都很好理解。collect() 稍稍复杂一点。

3-2-1. collect()

collect() 的使用主要在于对参数的理解,所有我们这里要专门讲下 collect() 函数的参数 Collector 这个类,以及怎么去构建 Collector 对象。只有在了解了这些之后,咱们才可以熟练的把他们用在各种场景中。

3-2-1-1. Collector

Collector 类目前没别的用处,就是专门用来作为 Stream 的 collect() 方法的参数的。把 Stream 里面的数据转换成我们最终想要的结果上。

Collector 各个方法,以及每个泛型的介绍:

/**
 * Collector是专门用来作为Stream的collect方法的参数的
 *
 * 泛型含义
 * T:是流中要收集的对象的泛型
 * A:是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。
 * R:是收集操作得到的对象(通常但不一定是集合)的类型。
 */
public interface Collector<T, A, R> {
    /**
     * 生成结果容器,容器类型为A
     * (多线程的情况下可能会调用多次,开多个线程同时去处理一个流,每个线程调用一次)
     */
    Supplier<A> supplier();

    /**
     * A对应supplier()函数创建的结果容器
     * T对应Stream流里面一个一个的元素
     * 用于消费元素,也就是归纳元素,一般在这个里面把流里面的元素T(也可以转换下)放到supplier()创建的结果T里面去
     */
    BiConsumer<A, T> accumulator();

    /**
     * 用于两个两个合并并行执行的线程的执行结果,将其合并为一个最终结果A
     * 多线程的情况下,多个线程并行执行。每个线程产生一个结果
     */
    BinaryOperator<A> combiner();

    /**
     * 用于将之前整合完的结果A转换成为R
     *
     * combiner()完成之后了A, 这里还可以在转一道。生成你自己想要的结果
     */
    Function<A, R> finisher();

    /**
     * characteristics表示当前Collector的特征值,
     * 这是个不可变Set
     * 它定义了收集器的行为--尤其是关于流是否可以多线程并行执行,以及可以使用哪些优化的提示
     */
    Set<Characteristics> characteristics();

    /**
     * 它定义了收集器的行为--尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示
     */
    enum Characteristics {
        /**
         * accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为UNORDERED,
         * 那它仅在用于无序数据源时才可以并行归约
         * 多线程并行
         */
        CONCURRENT,

        /**
         * 归约结果不受流中项目的遍历和累积顺序的影响(无序)
         */
        UNORDERED,

        /**
         * 无需转换结果
         */
        IDENTITY_FINISH
    }


    /**
     * 四参方法,用于生成一个Collector,T代表流中的一个一个元素,R代表最终的结果
     */
    public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,
                                              BiConsumer<R, T> accumulator,
                                              BinaryOperator<R> combiner,
                                              Characteristics... characteristics);

    /**
     * 五参方法,用于生成一个Collector,T代表流中的一个一个元素,A代表中间结果,R代表最终结果,finisher用于将A转换为R
     */
    public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
                                                 BiConsumer<A, T> accumulator,
                                                 BinaryOperator<A> combiner,
                                                 Function<A, R> finisher,
                                                 Characteristics... characteristics);



}

有了上面的介绍,接下来我们自己来 new 一个 Collector 对象,把我们 Steam 流里面的数据转换成 List 。(当然了Collectors类里面有提供这个方法,这里我们自己写一个也是为了方便大家的理解)

// 自己来组装Collector,返回一个List
@Test
public void collectNew() {
    Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    List<Integer> intList = stream.collect(
            new Collector<Integer, List<Integer>, List<Integer>>() {
                // 生成结果容器,容器类型为,我们这里为List<Integer>
                @Override
                public Supplier<List<Integer>> supplier() {
                    return new Supplier<List<Integer>>() {

                        @Override
                        public List<Integer> get() {
                            return new ArrayList<>();
                        }
                    };
                }

                // 把流里面的结果都放到结果容器里面去
                @Override
                public BiConsumer<List<Integer>, Integer> accumulator() {
                    return new BiConsumer<List<Integer>, Integer>() {
                        @Override
                        public void accept(List<Integer> integers, Integer integer) {
                            integers.add(integer);
                        }
                    };
                }

                // 两个两个合并并行执行的线程的执行结果,将其合并为一个最终结果A
                @Override
                public BinaryOperator<List<Integer>> combiner() {
                    return new BinaryOperator<List<Integer>>() {
                        @Override
                        public List<Integer> apply(List<Integer> left, List<Integer> right) {
                            left.addAll(right);
                            return left;
                        }
                    };
                }

                // 可以对最终的结果做一个转换操作
                @Override
                public Function<List<Integer>, List<Integer>> finisher() {
                    return new Function<List<Integer>, List<Integer>>() {
                        @Override
                        public List<Integer> apply(List<Integer> integers) {
                            return integers;
                        }
                    };
                }

                // 特征值
                @Override
                public Set<Characteristics> characteristics() {
                    return EnumSet.of(Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH);
                }
            });

    for (Integer item : intList) {
        System.out.println(item);
    }
}

4. 合理使用 Stream

我们将对常规的迭代、Stream 串行迭代以及 Stream 并行迭代进行性能测试对比,迭代循环中,我们将对数据进行过滤、分组等操作。分别进行以下几组测试:

测试 结论(迭代使用时间)
多核 CPU 服务器配置环境下,对比长度 100 的 int 数组的性能; 常规的迭代 < Stream 并行迭代 < Stream 串行迭代
多核 CPU 服务器配置环境下,对比长度 1.00E+8 的 int 数组的性能; Stream 并行迭代 < 常规的迭代 < Stream 串行迭代
多核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能;  Stream 并行迭代 < 常规的迭代 < Stream 串行迭代
单核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能; 常规的迭代 < Stream 串行迭代 < Stream 并行迭代

结论

在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以在平时处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。

5. 总结

在串行处理操作中,Stream 在执行每一步中间操作时,并不会做实际的数据操作处理,而是将这些中间操作串联起来,最终由终结操作触发,生成一个数据处理链表,通过 Java8 中的 Spliterator 迭代器进行数据处理;此时,每执行一次迭代,就对所有的无状态的中间操作进行数据处理,而对有状态的中间操作,就需要迭代处理完所有的数据,再进行处理操作;最后就是进行终结操作的数据处理。

在并行处理操作中,Stream 对中间操作基本跟串行处理方式是一样的,但在终结操作中,Stream 将结合 ForkJoin 框架对集合进行切片处理,ForkJoin 框架将每个切片的处理结果 Join 合并起来。最后就是要注意 Stream 的使用场景。

以上是关于Java8 Stream流的主要内容,如果未能解决你的问题,请参考以下文章

Java8 Stream流如何操作集合,一文带你了解!

Java8利用stream流快速排序(包括中文)

玩转 Java8 Stream,让你代码更高效紧凑简洁

SpringBoot实践:Java8流处理stream与completableFuture异步编程

Java8 Stream流方法

Java开发工程师进阶篇-Java8的Stream流使用技巧