RxBus-实现EventBus之Sticky
Posted wzgiceman
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxBus-实现EventBus之Sticky相关的知识,希望对你有一定的参考价值。
RxBus-实现EventBus之Sticky
背景
前期由于工作中需要将EventBus
替换成RxBus
,所以按照EventBus
的用法封装了一套自己的RxBus
,基本满足了使用,项目发出后有不少兄弟告诉我没有EventBus
的Sticky
功能,所以得闲完善这个功能,同样是按照EventBus3.0
注解的方式去实现和调用
效果
sticky是什么
在android开发中,Sticky事件只指事件消费者在事件发布之后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常情况下如果发送者发送了某个广播,而接收者在这个广播发送后才注册自己的Receiver,这时接收者便无法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就可以接收到刚才已经发布的广播。这就使得我们可以预先处理一些事件,让有消费者时再把这些事件投递给消费者。
使用
完全按照EventBus3.0版本的注解的方式去使用
- 发送消息
RxBus.getDefault().post(new EventStickText("我是sticky消息"));
- 接收消息
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void event(EventStickText eventStickText)
Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong ->
textView.setText(eventStickText.getMsg());
);
@Override
protected void onStart()
super.onStart();
RxBus.getDefault().register(this);
@Override
protected void onDestroy()
super.onDestroy();
RxBus.getDefault().unRegister(this);
实现
本篇的实现原理是基于之前的RxBus
封装的基础上的完善,所以需要大致了解RxBus
之前基本功能的封装原理方能更加全面的理解一下的内容
1.添加sticky
注解
不懂注解的同学可以先看下之前我写的两瓶关于注解的博客
这里添加boolean sticky()
的方法,并且默认指定参数false
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe
int code() default -1;
ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
boolean sticky() default false;
2.将sticky
加入数据封装类中
将数据新加入的注解sticky
加入到数据封装类中,在最后分发事件时用于区分sticky
事件
public class SubscriberMethod
public Method method;
public ThreadMode threadMode;
public Class<?> eventType;
public Object subscriber;
public int code;
public boolean sticky;
public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode,boolean sticky )
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.subscriber = subscriber;
this.code = code;
this.sticky=sticky;
******
3.记录post
事件分析
因为sticky
的特殊性,而自带RxJava
提供给我们四种方式处理Subject分发
ReplaySubject
在订阅者订阅时,会发送所有的数据给订阅者,无论它们是何时订阅的。PublishSubject
只会给在订阅者订阅的时间点之后的数据发送给观察者。AsyncSubject
只在原Observable
事件序列完成后,发送最后一个数据,后续如果还有订阅者继续订阅该Subject
, 则可以直接接收到最后一个值。BehaviorSubject
在订阅者订阅时,会发送其最近发送的数据(如果此时还没有收到任何数据,它会发送一个默认值)。
所以只有ReplaySubject
和BehaviorSubject
具备Sticky
的特性。
但是:这两种方式都不适合
BehaviorSubject
因为只是保留最近一次的事件,这样会导致事件的覆盖问题ReplaySubject能解决BehaviorSubject的事件丢失的问题,能保存所有的事件,但是分发起来确实一个难点,暂时没有找到合适的处理方法
还有我们之前的封装采用的
PublishSubject
的实现方式去分发RxBus
的事件,如果换成任何其他的分发机制都会导致sticky事件和正常事件数据需要独立来做,成本太高
public RxBus()
bus = new SerializedSubject<>(PublishSubject.create());
解决办法:通过Map<事件类型,事件>手动记录消息事件,和PublishSubject
数据统一起来处理,避免速度的独立,这里选择线程安全的ConcurrentHashMap
4.ConcurrentHashMap
记录事件
初始化
/*stick数据*/
private final Map<Class<?>, Object> stickyEvent =new ConcurrentHashMap<>();
在post
方法中添加事件
/**
* 提供了一个新的事件,单一类型
*
* @param o 事件数据
*/
public void post(Object o)
synchronized (stickyEvent)
stickyEvent.put(o.getClass(), o);
bus.onNext(o);
5.通过sticky
注解得到Observable
对象
在register(Object subscriber)
方法中通过反射得到自定义注解的数据,然后放入到自定义的数据类型SubscriberMethod
中
/**
* 注册
*
* @param subscriber 订阅者
*/
public void register(Object subscriber)
/*避免重复创建*/
if(eventTypesBySubscriber.containsKey(subscriber))
return;
Class<?> subClass = subscriber.getClass();
Method[] methods = subClass.getDeclaredMethods();
for (Method method : methods)
if (method.isAnnotationPresent(Subscribe.class))
//获得参数类型
Class[] parameterType = method.getParameterTypes();
//参数不为空 且参数个数为1
if (parameterType != null && parameterType.length == 1)
Class eventType = parameterType[0];
addEventTypeToMap(subscriber, eventType);
Subscribe sub = method.getAnnotation(Subscribe.class);
int code = sub.code();
ThreadMode threadMode = sub.threadMode();
boolean sticky = sub.sticky();
SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode,
sticky);
addSubscriberToMap(eventType, subscriberMethod);
addSubscriber(subscriberMethod);
当事件触发以后,通过SubscriberMethod
记录的数据生成不同的Observable对象,现在对sticky消息增加了响应的对象处理
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType)
synchronized (stickyEvent)
Observable<T> observable = bus.ofType(eventType);
final Object event = stickyEvent.get(eventType);
if (event != null)
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>()
@Override
public void call(Subscriber<? super T> subscriber)
subscriber.onNext(eventType.cast(event));
));
else
return observable;
这里使用merge
操作符:可以将多个Observables
合并生成一个Observable。
6.Observable
对象分发事件
得到sticky的压缩Observable
对象后,还需要按照注解中被指定的线程去触发事件任务
/**
* 用于处理订阅事件在那个线程中执行
*
* @param observable
* @param subscriberMethod
* @return
*/
private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod)
switch (subscriberMethod.threadMode)
case MAIN:
observable.observeOn(AndroidSchedulers.mainThread());
break;
case NEW_THREAD:
observable.observeOn(Schedulers.newThread());
break;
case CURRENT_THREAD:
observable.observeOn(Schedulers.immediate());
break;
case IO:
observable.observeOn(Schedulers.io());
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
return observable;
这里很简单,直接运用RxJava和RxAndroid的线程去管理即可
7.sticky消息的销毁
因为这里的sticky消息采用的是map队列记录的方式去实现,所以当sticky消息不在被需求记录的时候,或者程序退出,需要手动清空map队列的数据,避免内存溢出和浪费
/**
* 移除指定eventType的Sticky事件
*/
public <T> T removeStickyEvent(Class<T> eventType)
synchronized (stickyEvent)
return eventType.cast(stickyEvent.remove(eventType));
/**
* 移除所有的Sticky事件
*/
public void removeAllStickyEvents()
synchronized (stickyEvent)
stickyEvent.clear();
结果
通过sticky消息的完善,RxBus已经完全实现了EventBus3.0的全部功能,并且全部安装EventBus3.0的使用方法来封装,方便项目的迁移和使用。
注解方式定义
post方式分发事件
sticky消息功能
注册销毁简单化
源码
建议
以上是关于RxBus-实现EventBus之Sticky的主要内容,如果未能解决你的问题,请参考以下文章