Reactor的Publisher与Subscriber
Posted 爱叨叨的程序狗
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Reactor的Publisher与Subscriber相关的知识,希望对你有一定的参考价值。
Project Reactor介绍
在计算机中,响应式变成或者反应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地变大静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
作用
Reactor希望用少量、有限个数的线程来满足高负载的需要。 IO阻塞浪费系统性能,只有纯异步处理才能发挥系统的全部性能。JDK的异步API较为难用,成为异步编程的瓶颈。
响应式编程特性
- Responsive(响应式)
- Resilient(弹性)
- Message Driven(消息驱动)
- asynchronous request(异步请求)
- non-blocking(非阻塞)
- Backpressure(背压)
数据处理流程
测试代码
Subscriber增强类:
public class LoggingSubscriber<T> implements Subscriber<T>
private static final Logger log = LoggerFactory.getLogger(LoggingSubscriber.class);
private Subscription subscription;
private long requested;
private long received;
private CountDownLatch finished = new CountDownLatch(1);
@Override
public void onComplete()
log.info("onComplete: sub=", subscription.hashCode());
finished.countDown();
@Override
public void onError(Throwable t)
log.error("Error: sub=, message=", subscription.hashCode(), t.getMessage(),t);
finished.countDown();
@Override
public void onNext(T value)
log.info("onNext: sub=, value=", subscription.hashCode(), value);
this.received++;
this.requested++;
subscription.request(1);
@Override
public void onSubscribe(Subscription sub)
log.info("onSubscribe: sub=", sub.hashCode());
this.subscription = sub;
this.received = 0;
this.requested = 1;
sub.request(1);
public long getRequested()
return requested;
public long getReceived()
return received;
/**
* 阻塞调用者,直到发布者发出所有对象或产生错误
*/
public void block()
try
finished.await(10, TimeUnit.SECONDS);
catch(InterruptedException iex)
throw new RuntimeException(iex);
使用Streams处理数据
public class SteamTest
private static Logger log = LoggerFactory.getLogger(SteamTest.class);
public static void main(String[] args)
Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
限制请求数量
public class YieldTest
private static Logger log = LoggerFactory.getLogger(SteamTest.class);
public static void main(String[] args)
/**
* 限制对象创建数量
* 接收yieldRequest对象并返回下一个要发出的对象的Function参数
*/
Publisher<String> pub = Streams.yield((t) ->
System.out.println(t.getRequestNum());
return t.getRequestNum() < 5 ? "hello" : null;
);
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
周期性请求
public class PeriodicTest
private static Logger log = LoggerFactory.getLogger(SteamTest.class);
public static void main(String[] args)
/**
* 周期性做请求
*/
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) ->
return t < 5 ? String.format("hello %d", t) : null;
);
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
一些核心概念
Operators-Publisher/Subscriber
-
Flux<T>
是一个标准的Reactive Streams规范中的Publisher<T>
,它代表一个包含了[0…N]个元素的异步序列流。在Reactive Streams规范中,针对流中每个元素,订阅者将会监听这三个事件:onNext
、onComplete
、onError
。 -
Mono<T>
是一个特殊的Flux<T>
,它代表一个仅包含1个元素的异步序列流。因为只有一个元素,所以订阅者只需要监听onComplete
、onError
。
Backpressure
- Subscription
- onRequest()、onCancel()、onDispose()
线程调度Schedulers
- immediate()/single()/newSingle()
- Elastic()/parallel()/newParallel()
错误处理
- onError/onErrorReturn/onErrorResume
- doOnError/doFinally
以上是关于Reactor的Publisher与Subscriber的主要内容,如果未能解决你的问题,请参考以下文章
将ros中suscriber和publisher写入class中