stream的使用

Posted jry-blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了stream的使用相关的知识,希望对你有一定的参考价值。

stream的使用
  • 概述
    • Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation)
    • 集合专注的是数据,流专注的是算法和计算(Stream不是集合元素、不是数据结构、不保存数据)
    • Stream API 借助于同样新出现的 Lambda 表达式,极大的提高编程效率和程序可读性。
    • 提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join并行方式来拆分任务和加速处理过程。
    • Java 的并行 API 演变历程基本如下:
      • 1.0-1.4 中的 java.lang.Thread
      • 5.0 中的 java.util.concurrent
      • 6.0 中的 Phasers 等
      • 7.0 中的 Fork/Join 框架
      • 8.0 中的 Lambda
    • Stream 的另外一大特点是,数据源本身可以是无限的。
  • 特性
    • 无存储:Stream是基于数据源的对象,它本身不存储数据元素,而是通过管道将数据源的元素传递给操作。
    • 函数式编程:对Stream的任何修改都不会修改背后的数据源,比如对Sream的filter操作并不会删除被过滤掉的元素,而是生成一个不含被过滤元素的新的Stream。
    • 延迟执行:Stream由一个或多个中间操作(intermediate operation)和一个结束操作(terminal operation)两部分组成。只有执行了结束操作,Stream定义的中间操作才会依次执行,这就是Stream的延迟特性。
    • 可消费性:Stream只能被“消费”一次,一旦遍历过就会失效。就像容器的迭代器那样,想要再次遍历必须重新生成一个新的Stream。
  • Stream类
    • 继承关系:Stream ——》 BaseStream<T, S extendsBaseStream<T, S>> ——》AutoCloseable
      • BaseStream还派生出基于三种基本数据类型的流:IntStream, LongStream, DoubleStream
      • 由于继承了AutoCloseable,便集成了流的自动关闭操作。
    • BaseStream详解
      • Iterator iterator();    获得流的迭代器,并返回对该迭代器的引用(终端操作)
      • Spliterator spliterator();   获取流的spliterator,并返回其引用(终端操作)
      • boolean isParallel();   如果调用流是一个并行流,则返回true;如果调用流是一个顺序流,则返回false。
      • S sequential();  基于调用流,返回一个顺序流。如果调用流已经是顺序流了,就返回该流。(中间操作)
      • S parallel();  基于调用流,返回一个并行流。如果调用流已经是并行流了,就返回该流。(中间操作)
      • S unordered();  基于调用流,返回一个无序流。如果调用流已经是无序流了,就返回该流。(中间操作)
      • S onClose(Runnable closeHandler);   返回一个新流,closeHandler指定了该流的关闭处理程序,当关闭该流时,将调用这个处理程序。(中间操作)
      • void close();  从AutoCloseable继承来的,调用注册关闭处理程序,关闭调用流(很少会被使用到)
  • 流的操作
    • 中间操作(Intermediate operations)
      • 一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),只有在指定的终端操作上才会执行中间操作。
      • 无状态操作
        • 处理流中的元素时,会对当前的元素进行单独处理。比如:谓词过滤操作,因为每个元素都是被单独进行处理的,所有它和流中的其它元素无关,因此被称为无状态操作。
      • 有状态操作
        • 某个元素的处理可能依赖于其他元素。比如查找最小值,最大值,和排序,因为他们都依赖于其他的元素。因此为称为有状态操作。
    • 终端操作(Terminal operations)
      • 流只能有一个 terminal操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
      • 短路操作(short-circuiting)
        • 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的Stream,但返回一个有限的新 Stream。
        • 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
    • 技术分享图片
  • Stream的使用
    • 1.创建Stream:从数据源(如集合或数组)中获取一个流,或自行生成流
      • 通过数组或集合
      •         // 通过集合生成
                Collection<String> collection = new ArrayList<>(16);
                Stream<String> stream = collection.stream();
                    // 并行流
                Stream<String> stringStream = collection.parallelStream();
                // 通过数组生成
                int[] nums = new int[6];
                IntStream stream1 = Arrays.stream(nums);    
      • Stream的静态工厂 或 builder模式
                // 通过流的静态工厂
                Stream<Integer> integerStream = Stream.of(1, 2, 3);
                    //生成空流
                Stream<Integer> empty = Stream.empty();
                    // 拼接流
                Stream<Integer> concat = Stream.concat(integerStream, empty);
                    // 生成无限流, 并不会一开始生成无限个数,加入内存
                    // 而是通过终端操作, 内部迭代生成, 可以通过limit()截断
                Stream<Double> generate = Stream.generate(Math::random);
                Stream<Integer> iterate = Stream.iterate(1, (x) -> x + 2);
                iterate.limit(5).forEach(System.out::println);
                    // IntStream 或 LongStream 的静态工厂
                    // 生成包含 1 到 4 的流, 不含结束值
                IntStream.range(1, 5);
                    // 生成包含 1 到 5 的流,包含结束值
                IntStream.rangeClosed(1, 5);
                // 流的builder
                Stream.Builder<Object> builder = Stream.builder();
                builder.accept(1);
                    // add方法返回当前builder对象,以实现链式编程
                builder.add(2).add(3);
                builder.build().forEach(System.out::println);
      • 其他
        • java.io.BufferedReader.lines()
        • java.nio.file.Files.walk()
        • Random.ints()
        • BitSet.stream()
        • Pattern.splitAsStream(java.lang.CharSequence)
        • JarFile.stream()
        • 自行构建:java.util.Spliterator
    • 2.数据转换
      • 每次中间操作过程,不会修改原有Stream对象,而生成一个新的Stream返回,从而实现链式编程。
      • 映射
        • 将流中的元素映射为新的数据类型到新流中
        • map
                  // map()
                      // 将流中的元素进行处理,返回一个新的数据类型,映射到新流中
                  List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3));
                  list.stream().map(i -> String.valueOf(i) + "test").forEach(System.out::println);
                      // mapToInt,mapToLong,mapToDouble 与map类似,只是映射为基本类型到对应的Stream中
        • flatMap
                  //flatMap
                      //flatMap()操作能把原始流中的元素进行一对多的转换,并且将新生成的元素全都合并到它返回的流里面。
                  List<String> stringList = new ArrayList<>(Arrays.asList("1,2,3", "4,5,6", "7,8,9"));
                  stringList.stream()
                          .flatMap(s -> Arrays.stream(s.split(",")))
                          .forEach(System.out::print); //输出 : 123456789    
      • 遍历-peek()
        • peek的作用类似forEach(),但唯一不同的是peek不是终端操作,不会消费原有的流
    • 3.执行终端操作,返回结果
      • 缩减操作:把一个流缩减为一个值
        • 特性
          • 无状态:每个元素都被单独地处理,他和流中的其它元素是没有任何依赖关系的
          • 不干预:操作数不会改变数据源
          • 关联性:标准的数学含义,即,给定一个关联运算符,在一系列操作中使用该运算符,先处理哪一对操作数是无关紧要的
        • 特例缩减:max(),min(),count()
        • reduce()
                  // 求总和  reduce
                  // Optional<T> reduce(BinaryOperator<T> accumulator); 如果流为空,则返回空,因此返回Optional
                  Optional<Integer> sum1 = Stream.of(1, 2, 3).reduce(Integer::sum);
                  if (sum1.isPresent()) {
                      System.out.println("sum : " + sum1.get()); // sum : 6
                  }
                  // T reduce(T identity, BinaryOperator<T> accumulator);
                  // 第一个参数为 初始值,若流为空,则返回初始值,因此返回值不可能为空
                  Integer sum2 = Stream.of(1, 2, 3).reduce(1, Integer::sum);
                  System.out.println("sum : " + sum2); //  sum : 7
                  // <U> U reduce(U identity,
                  //                BiFunction<U, ? super T, U> accumulator,
                  //                BinaryOperator<U> combiner);
                  //类似上面, 但是第三个参数的combiner用于合并并发流中每个线程的result
                  // 单线程下是不执行的
                  Integer sum3 = Stream.of(1, 2, 3).reduce(1, Integer::sum, (a,b) -> {
                      System.out.println(111);
                      return a + b;
                  });
                  System.out.println("sum : " + sum3); // sum : 7    
      • 收集
                // 收集,将流中的元素重新打包成集合
                List<String> collectTest = new ArrayList<>(Arrays.asList("1,2,3", "4,5,6", "7,8,9"));
                // 主要有两种方法
                // 第一种,是自行实现收集操作,与 reduce()很相似
                // <R> R collect(Supplier<R> supplier,
                //                  BiConsumer<R, ? super T> accumulator,
                //                  BiConsumer<R, R> combiner);
        
                // 一般主要使用第二种, 直接使用Collectors类中已经实现的静态收集器
                //集合
                List<String> newList = collectTest.stream().collect(Collectors.toList());//返回list
                Set<String> newSet = collectTest.stream().collect(Collectors.toSet());//返回set
                TreeSet<String> newTreeSet = collectTest.stream().collect(Collectors.toCollection(TreeSet::new));//自行定义返回的集合类型
        // 返回map,键值都为元素本身
                // 或者为元素的某个属性  比如: User::getId
                Map<String, String> newMap = collectTest.stream().collect(Collectors.toMap(Function.identity(), Function.identity()));
                // 拼接
                String join = collectTest.stream().collect(Collectors.joining());
                String joinOnDelimiter = collectTest.stream().collect(Collectors.joining(","));
                String joinAllCondition = collectTest.stream().collect(Collectors.joining(",", "begin", "end"));
                // 聚合
                String max = collectTest.stream().collect(Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparingInt(String::length)), Optional::get));
                Optional<String> min = collectTest.stream().collect(Collectors.minBy(Comparator.comparingInt(String::length)));
                Double average = collectTest.stream().collect(Collectors.averagingInt(s -> s.length()));
                Integer sum = collectTest.stream().collect(Collectors.summingInt(s -> s.length()));
                // 映射,与map()近似,先处理映射,再使用收集器
                collectTest.stream().collect(Collectors.mapping(s -> s.toUpperCase(), Collectors.joining(",")));
                // 分组
                collectTest.stream().collect(Collectors.groupingBy(s -> s.length() % 2 == 0));
                collectTest.stream().collect(Collectors.partitioningBy(s -> s.length() % 2 == 0));
                // 缩减 reducing, 类似reduce()
                collectTest.stream().collect(Collectors.reducing((s1, s2) -> String.valueOf(Integer.sum(s1.length(), s2.length()))));

         

  • parallelStream中的线程安全
    • 并行模式下不能保证线程的安全问题
    • 但,终端操作若使用collect()和reduce(),就能满足线程安全
  • 引用

以上是关于stream的使用的主要内容,如果未能解决你的问题,请参考以下文章

从Node.js Stream写入多个文件

java8 .stream().sorted().filter().map().collect()用法

Java8 Stream针对List先分组再求和最大值最小值平均值等

协议切换成功与否

学习 Spark Streaming

TS Stream 详解