Reactive stream 响应式流——Webflux响应式编程利器

Posted algerfan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Reactive stream 响应式流——Webflux响应式编程利器相关的知识,希望对你有一定的参考价值。

Reactive stream 响应式流

  • Reactive stream是jdk9新特性,提供了一套API,就是一种订阅发布者模式
  • 被压,背压是指在异步场景中,发布者发送事件速度远快于订阅者的处理速度的情况下,一种告诉上游的发布者降低发送速度的策略,简而言之,背压就是一种流速控制的策略。
    举个例子:假设以前是没有水龙头的,只能自来水厂主动的往用户输送水,但是不知道用户需要多少水,有了Reactive stream,就相当于有了水龙头,用户可以主动的请求用水,而自来水厂也知道了用户的需求
    示例代码(需要jdk9以上版本的支持)
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {
    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
        
        // 2. 定义订阅者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;
                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 处理完调用request再请求一个数据
                this.subscription.request(1);
                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();
                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }
        };

        // 3. 发布者和订阅者 建立订阅关系
        publiser.subscribe(subscriber);

        // 4. 生产数据, 并发布
        // 这里忽略数据生产过程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成数据:" + i);
            // submit是个block方法
            publiser.submit(i);
        }

        publiser.submit(111);
        publiser.submit(222);
        publiser.submit(333);

        // 5. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }
}

以上是关于Reactive stream 响应式流——Webflux响应式编程利器的主要内容,如果未能解决你的问题,请参考以下文章

JDK9新特性 Reactive Stream 响应式流

Reactive stream 响应式流——Webflux响应式编程利器

Reactive 响应式流与制奶厂业务

gateway&reactive(响应式流)函数编程的webflux

Java9响应式编程Reactive Stream

Spring Boot Reactive Streams