webFlux 学习

Posted bj-xiaodao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了webFlux 学习相关的知识,希望对你有一定的参考价值。

在学习webflux 之前我们先要了解一个概念

什么是背压(back press)

对我们以前使用的发布订阅模式来说.我们的以前的模式是 消费只能通过提供者提供的数据来持续消费 如果一直发送消息,那么就只能一直消费

我们对背压做一个详细的比喻

比如我们每家每户,都有一个水龙头.自来水公司相当于发布者者,我们家庭就是个订阅者,水就是数据,在之前的模式我们的订阅者是一个被动接受的概念

背压就是相当于我们家里安装了一个水龙头,我们需要的时候就把他打开.不需要的时候就把他关闭

 

reactive stream

响应式流.这是jdk9 引入的一套标准,他能够很好的实现背压,但是我去官网的时候,发现jdk9已经结束.我们看看jdk11吧

jdk11有一个flow接口 里面有4个方法 

1.publisher 就是发布者

  subscribe:就是和订阅者产生一个关系

2.subscribe 就是订阅者

  onSubscribe:签署一个订阅关系传入subscription

  onNext(): 接受到一条数据

  onError(): 就是出错

  onComplete(): 完成

3.Subscription接口中就是其中实现背压的关键 其中request方法就是告诉发布者我需要多少资源,发布者那里 就会发布多少资源

4.Processor  既可以做发布者,也可以做订阅者,具体是用来中间环节的数据处理工作

技术图片

 简单的例子我们来运行下

每次处理完之后告诉发布者我还可以处理的数据是多少

  public static void main(String[] args) throws InterruptedException 
      //1.定义发布者

        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //2. 定义订阅者

        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() 

            private Flow.Subscription subscription;
            int total = 0;
            @Override
            public void onSubscribe(Flow.Subscription subscription) 
                //保存订阅关系
                this.subscription = subscription;
                //请求一个数据
                subscription.request(1);
            

            @Override
            public void onNext(Integer item) 
                System.out.println("接受到: "+ item);
                total++;
                System.out.println("接受的条数为  : "+ total);
                this.subscription.request(1);
                //或者到达一定数量告诉发布者不接受数据了
                if(total ==10)
                    this.subscription.cancel();
                    System.out.println("接受数据已经足够");

                





            

            @Override
            public void onError(Throwable throwable) 
            throwable.printStackTrace();
            //抛出异常就返回
            this.subscription.cancel();
            

            @Override
            public void onComplete() 
                System.out.println("数据处理完了.");
            
        ;

        //3发布和订阅 建立订阅关系

        publisher.subscribe(subscriber);


        //4.生产数据
        for (int i = 0; i < 100; i++) 
            publisher.submit(i);
        

        //5.关闭发布者
        publisher.close();

        Thread.currentThread().join(5000);

    

processor

  public static void main(String[] args) throws InterruptedException 
      //1.定义发布者

        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        //2.定义一个处理器 对数据进行过滤,并转化为string的类型
        MyProcessor myProcessor = new MyProcessor();
        //3.发布者与处理器建立关系
        publisher.subscribe(myProcessor);

        //4. 定义最终订阅者

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() 

            private Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) 
                //保存订阅关系
                this.subscription = subscription;
                //请求一个数据
                subscription.request(1);
            
            LinkedList<String > list = new LinkedList<>();
            @Override
            public void onNext(String item) 
                list.add(item);
                this.subscription.request(1);
                //或者到达一定数量告诉发布者不接受数据了
                System.out.println(item);
                if(list.size() == 10)
                    this.subscription.cancel();
                    System.out.println("接受数据已经足够");
                    this.onComplete();

                





            

            @Override
            public void onError(Throwable throwable) 
            throwable.printStackTrace();
            //抛出异常就返回
            this.subscription.cancel();
            

            @Override
            public void onComplete() 
                System.out.println("数据处理完了."+list.toString());
            
        ;

        //5 处理器和最终的订阅者建立关系

       myProcessor.subscribe(subscriber);


        //4.生产数据
        for (int i = 0; i < 100; i++) 
            publisher.submit(i);
        

        //5.关闭发布者
        publisher.close();

        Thread.currentThread().join(5000);



    
    static   class  MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> 
        private Flow.Subscription subscription;
        @Override
        public void onSubscribe(Flow.Subscription subscription) 
            this.subscription = subscription;

            subscription.request(1);

        

        @Override
        public void onNext(Integer item) 
//            System.out.println("processor-> 处理器接收到的数据.."+item);
            if(item % 2 ==0)
                this.submit("转->" +item);

            
            this.subscription.request(1);
        

        @Override
        public void onError(Throwable throwable) 
        throwable.printStackTrace();
        this.subscription.cancel();
        

        @Override
        public void onComplete() 
            System.out.println("processor 处理器已经处理完成!");
        
    

里面的运行机制

publiser.submit():是一个阻塞方法

订阅者有一个缓冲池.当缓冲池满了之后 submit()方法就会被阻塞.这样就不会再去生产数据了

subscription 缓冲的capacity默认是256个.

以上是关于webFlux 学习的主要内容,如果未能解决你的问题,请参考以下文章

Spring WebFlux学习记录

Spring WebFlux学习记录

webFlux 学习

springboot2 webflux 响应式编程学习路径

Java响应式编程Springboot WebFlux基础与实战

[转]springboot2 webflux 响应式编程学习路径