SpringBoot实践:Java8流处理stream与completableFuture异步编程

Posted A叶子叶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot实践:Java8流处理stream与completableFuture异步编程相关的知识,希望对你有一定的参考价值。

目录

Stream

          map

filter

flatmap

collect

CompletableFuture


流处理stream结合函数式编程规范,极大提升编程效率,减少代码量,而异步编程completableFuture的使用,则能够更加清晰地表述出业务逻辑,方法的异步调用关系路径很好滴描述了业务脉络;

Stream

这里先构造2个对象,student和teacher,teacher拥有多个student对象,经常有类似使用teacher对象后对student操作的场景,使用流处理比for、foreach、Iterator更优雅;

package org.example;

import org.example.model.Student;
import org.example.model.Teacher;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;

/**
 * Hello world!
 *
 */
public class App {
    public static void main(String[] args) {
        System.out.println("begin.");
        Student a = new Student(1, "001", "zhangsan", "1");
        Student b = new Student(2, "001", "lisi", "2");
        Student c = new Student(3, "001", "wangwu", "3");
        Student d = new Student(4, "001", "mazi", "4");
        Student e = new Student(5, "001", "zhaowu", "5");
        Student f = new Student(6, "001", "xuliu", "6");
        Student g = new Student(7, "001", "dongqi", "7");
        Student h = new Student(8, "001", "chenba", "8");

        List<Student> studentList = new ArrayList<Student>();
        studentList.add(a);
        studentList.add(b);
        studentList.add(c);
        studentList.add(d);
        studentList.add(e);
        studentList.add(f);
        studentList.add(g);
        studentList.add(h);


        Teacher t1 = new Teacher(1, "12", "boy", "laoshi", studentList);

        
    }

}

map

map通过映射直接拿到属性值,返回的是属性而并非原有对象,有多重写法:

        List<Integer> list = t1.getStudentList()
                .stream()
                .map(Student::getId)
                .collect(Collectors.toList());
        List<Integer> list1 = t1.getStudentList()
                .stream()
                .map(student -> student.getId())
                .collect(Collectors.toList());
        System.out.println(list);
        System.out.println(list1);

也可以在取值时候做转换运算:

// map做运算
        List<Integer> list2 = t1.getStudentList()
                .stream()
                .map(student -> {
            return student.getId() + 1;
        }).collect(Collectors.toList());
        System.out.println("list2:" + list2);

filter

filter过滤,返回的是原有对象:

        // filter过滤
        List<Student> list3 = t1.getStudentList()
                .stream()
                .filter(student -> {
                    return student.getId() > 3;
                }).filter(student -> {
                    return student.getName().contains("s");
                }).collect(Collectors.toList());
        System.out.println("list3:" + list3);
        List<Student> list4 = t1.getStudentList()
                .stream()
                .filter(student -> student.getId() > 2)
                .collect(Collectors.toList());

flatmap

flatmap拍扁,跟flink中的流式处理定义相似,输出一对多

        // flatmap拍扁,输出值
        BinaryOperator<Student> getBiggestID = (x1, x2) -> {
            int id1 = x1.getId();
            int id2 = x2.getId();
            return x1.getId() > x2.getId() ? x1 : x2;

        };
        Integer maxID = Optional.ofNullable(t1.getStudentList())
                .flatMap(students -> students.stream().reduce(getBiggestID))
                .map(Student::getId)
                .orElse(0);
        System.out.println(maxID);

collect

可以流处理同样可以聚合,groubpingby函数为例:

        // groupby转化
        Map<String,List<Student>> list6 = t1.getStudentList().stream()
                .filter(student -> !student.getName().isEmpty())
                .collect(Collectors.groupingBy(x->x.getName().contains("s")?"ss":"nn"));

        System.out.println(list6);

CompletableFuture

当需要对Teacher和Student对象进行查询逻辑处理时候,就可以用异步编程相关方法;在方法的定义加上CompletableFuture<>关键词,把返回值放在泛型中,比如CompletableFuture<List<Student>>getStudents()方法,如果不使用异步编程直接返回List,使用异步编程返回CompletableFuture对象,在使用时候;

package org.example.service;

import org.example.model.Student;
import org.example.model.Teacher;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static java.util.concurrent.CompletableFuture.completedFuture;

public class TeacherServiceImpl {
    Student a = new Student(1, "sat", "zhangsan", "1");
    Student b = new Student(2, "hhh", "lisi", "2");
    Student c = new Student(3, "kg", "wangwu", "3");
    Student d = new Student(4, "ss", "mazi", "4");
    Student e = new Student(5, "tt", "zhaowu", "5");
    Student f = new Student(6, "dfa", "xuliu", "6");
    Student g = new Student(7, "safg", "dongqi", "7");
    Student h = new Student(8, "tsa", "chenba", "8");


    List<Student> students = new ArrayList<Student>();

    // 假设需要进行数据库查询操作
    public CompletableFuture<List<Student>>  getStudents(){
        Student a = new Student(1, "sat", "zhangsan", "1");
        Student b = new Student(2, "hhh", "lisi", "2");
        Student c = new Student(3, "kg", "wangwu", "3");
        Student d = new Student(4, "ss", "mazi", "4");
        Student e = new Student(5, "tt", "zhaowu", "5");
        Student f = new Student(6, "dfa", "xuliu", "6");
        Student g = new Student(7, "safg", "dongqi", "7");
        Student h = new Student(8, "tsa", "chenba", "8");
        List<Student> studentList = new ArrayList<Student>();
        studentList.add(a);
        studentList.add(b);
        studentList.add(c);
        studentList.add(d);
        studentList.add(e);
        studentList.add(f);
        studentList.add(g);
        studentList.add(h);
        studentList.add(h);
        return completedFuture(studentList);
    }

    // 对数据库返回值再加工
    public CompletableFuture<List<Teacher>> getTeachers(List<Student> students){
        List<Teacher> teachers = new ArrayList<Teacher>();
        Teacher t2 = new Teacher(1, "12", "boy", "lixx", students);
        Teacher t3 = new Teacher(3, "21", "girl", "wangxx", students);
        teachers.add(t2);
        teachers.add(t3);
        return completedFuture(teachers);
    }

}

对Student是数据库操作,对teacher是数据库返回的在处理,那么作为impl,就可以组合进行业务逻辑链路:

    // 异步编程链路
    public static CompletableFuture<String> test(){
        return teacherService.getStudents()
                .thenApplyAsync(students -> students.stream()
                .filter(i->i.getName().contains("s"))
                .collect(Collectors.toList()))
                .thenApply(students -> teacherService.getTeachers(students).toString())
                .toCompletableFuture();

    }

关于thencompose和thenapply方法需要搞明白;

supplyAsync(Func):参数为计算任务(函数),该方法会异步执行传入的Func函数,并且使用get或join方法获取计算结果;

runAsync(Func):该方法也可以创建CompletableFuture实例,只是适合创建无返回值的Func;

thenApply():then开头的函数都是函数执行后的操作,thenApply前后两个任务在同一个线程中执行,thenApplyAsyn前后两个任务不是同一个线程,而是两个不同线程,相同点是都要等前一个方法结束后才会执行后面跟着的方法,thenApply函数传入的函数需要有入参和出参;

thenCombine():与thenApply相比,更强调前后连接任务的关系是并行的,也就是Func可以和后面连接的函数并行处理,最后两个任务均完成将结果同时传递给下游任务。

thenCompose():当前后两个任务存在关联关系,当第一个任务完成时才会执行第二个操作就可以使用thenCompose方法,两个异步任务全部完成时才会执行某些操作用thenCombine,

whenComplete():任务完成后的回调;

以上是关于SpringBoot实践:Java8流处理stream与completableFuture异步编程的主要内容,如果未能解决你的问题,请参考以下文章

打通实时流处理log4j-flume-kafka-structured-streaming

Springboot分布式限流实践

java8新特性——并行流与顺序流

Java8:18个日期处理的实践

Java8实战使用并行流

java8中的流操作