RxBus-实现EventBus之Sticky

Posted wzgiceman

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxBus-实现EventBus之Sticky相关的知识,希望对你有一定的参考价值。

RxBus-实现EventBus之Sticky

背景

前期由于工作中需要将EventBus替换成RxBus,所以按照EventBus的用法封装了一套自己的RxBus,基本满足了使用,项目发出后有不少兄弟告诉我没有EventBusSticky功能,所以得闲完善这个功能,同样是按照EventBus3.0注解的方式去实现和调用

原RxBus基本功能实现原理:EventBus完全一样的RxBus


效果

sticky是什么

android开发中,Sticky事件只指事件消费者在事件发布之后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常情况下如果发送者发送了某个广播,而接收者在这个广播发送后才注册自己的Receiver,这时接收者便无法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就可以接收到刚才已经发布的广播。这就使得我们可以预先处理一些事件,让有消费者时再把这些事件投递给消费者。

使用

完全按照EventBus3.0版本的注解的方式去使用

  • 发送消息
RxBus.getDefault().post(new EventStickText("我是sticky消息"));
  • 接收消息

    @Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
    public void event(EventStickText eventStickText) 
        Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong -> 
            textView.setText(eventStickText.getMsg());
        );
    


    @Override
    protected void onStart() 
        super.onStart();
        RxBus.getDefault().register(this);
    

    @Override
    protected void onDestroy() 
        super.onDestroy();
        RxBus.getDefault().unRegister(this);
    

实现

本篇的实现原理是基于之前的RxBus封装的基础上的完善,所以需要大致了解RxBus之前基本功能的封装原理方能更加全面的理解一下的内容

原RxBus基本功能实现原理:EventBus完全一样的RxBus

1.添加sticky注解

不懂注解的同学可以先看下之前我写的两瓶关于注解的博客

Java-注解详解

Android-注解详解

这里添加boolean sticky()的方法,并且默认指定参数false

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe 
    int code() default -1;
    ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
    boolean sticky() default false;

2.将sticky加入数据封装类中

将数据新加入的注解sticky加入到数据封装类中,在最后分发事件时用于区分sticky事件

public class SubscriberMethod 
    public Method method;
    public ThreadMode threadMode;
    public Class<?> eventType;
    public Object subscriber;
    public int code;
    public boolean sticky;

    public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode,boolean sticky ) 
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.subscriber = subscriber;
        this.code = code;
        this.sticky=sticky;
    
 ******
 

3.记录post事件分析

因为sticky的特殊性,而自带RxJava提供给我们四种方式处理Subject分发

  • ReplaySubject在订阅者订阅时,会发送所有的数据给订阅者,无论它们是何时订阅的。

  • PublishSubject只会给在订阅者订阅的时间点之后的数据发送给观察者。

  • AsyncSubject只在原Observable事件序列完成后,发送最后一个数据,后续如果还有订阅者继续订阅该Subject, 则可以直接接收到最后一个值。

  • BehaviorSubject在订阅者订阅时,会发送其最近发送的数据(如果此时还没有收到任何数据,它会发送一个默认值)。

所以只有ReplaySubjectBehaviorSubject具备Sticky的特性。

但是:这两种方式都不适合

  • BehaviorSubject因为只是保留最近一次的事件,这样会导致事件的覆盖问题

  • ReplaySubject能解决BehaviorSubject的事件丢失的问题,能保存所有的事件,但是分发起来确实一个难点,暂时没有找到合适的处理方法

  • 还有我们之前的封装采用的PublishSubject的实现方式去分发RxBus的事件,如果换成任何其他的分发机制都会导致sticky事件和正常事件数据需要独立来做,成本太高

 public RxBus() 
        bus = new SerializedSubject<>(PublishSubject.create());
    

解决办法:通过Map<事件类型,事件>手动记录消息事件,和PublishSubject数据统一起来处理,避免速度的独立,这里选择线程安全的ConcurrentHashMap

4.ConcurrentHashMap记录事件

初始化

/*stick数据*/
    private final Map<Class<?>, Object> stickyEvent =new ConcurrentHashMap<>();

post方法中添加事件

 /**
     * 提供了一个新的事件,单一类型
     *
     * @param o 事件数据
     */
    public void post(Object o) 
        synchronized (stickyEvent) 
            stickyEvent.put(o.getClass(), o);
        
        bus.onNext(o);
    

5.通过sticky注解得到Observable对象

register(Object subscriber)方法中通过反射得到自定义注解的数据,然后放入到自定义的数据类型SubscriberMethod

  /**
     * 注册
     *
     * @param subscriber 订阅者
     */
    public void register(Object subscriber) 
          /*避免重复创建*/
        if(eventTypesBySubscriber.containsKey(subscriber))
            return;
        
        Class<?> subClass = subscriber.getClass();
        Method[] methods = subClass.getDeclaredMethods();
        for (Method method : methods) 
            if (method.isAnnotationPresent(Subscribe.class)) 
                //获得参数类型
                Class[] parameterType = method.getParameterTypes();
                //参数不为空 且参数个数为1
                if (parameterType != null && parameterType.length == 1) 

                    Class eventType = parameterType[0];

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();
                    boolean sticky = sub.sticky();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode,
                            sticky);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);
                
            
        
    

当事件触发以后,通过SubscriberMethod记录的数据生成不同的Observable对象,现在对sticky消息增加了响应的对象处理

 /**
     * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
     */
    public <T> Observable<T> toObservableSticky(final Class<T> eventType) 
        synchronized (stickyEvent) 
            Observable<T> observable = bus.ofType(eventType);
            final Object event = stickyEvent.get(eventType);

            if (event != null) 
                return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() 
                    @Override
                    public void call(Subscriber<? super T> subscriber) 
                        subscriber.onNext(eventType.cast(event));
                    
                ));
             else 
                return observable;
            
        
    

这里使用merge操作符:可以将多个Observables合并生成一个Observable。

6.Observable对象分发事件

得到sticky的压缩Observable对象后,还需要按照注解中被指定的线程去触发事件任务

  /**
     * 用于处理订阅事件在那个线程中执行
     *
     * @param observable
     * @param subscriberMethod
     * @return
     */
    private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) 

        switch (subscriberMethod.threadMode) 
            case MAIN:
                observable.observeOn(AndroidSchedulers.mainThread());
                break;
            case NEW_THREAD:
                observable.observeOn(Schedulers.newThread());
                break;
            case CURRENT_THREAD:
                observable.observeOn(Schedulers.immediate());
                break;
            case IO:
                observable.observeOn(Schedulers.io());
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
        
        return observable;
    

这里很简单,直接运用RxJava和RxAndroid的线程去管理即可

7.sticky消息的销毁

因为这里的sticky消息采用的是map队列记录的方式去实现,所以当sticky消息不在被需求记录的时候,或者程序退出,需要手动清空map队列的数据,避免内存溢出和浪费

 /**
     * 移除指定eventType的Sticky事件
     */
    public <T> T removeStickyEvent(Class<T> eventType) 
        synchronized (stickyEvent) 
            return eventType.cast(stickyEvent.remove(eventType));
        
    

    /**
     * 移除所有的Sticky事件
     */
    public void removeAllStickyEvents() 
        synchronized (stickyEvent) 
            stickyEvent.clear();
        
    

结果

通过sticky消息的完善,RxBus已经完全实现了EventBus3.0的全部功能,并且全部安装EventBus3.0的使用方法来封装,方便项目的迁移和使用。

  • 注解方式定义

  • post方式分发事件

  • sticky消息功能

  • 注册销毁简单化


源码

传送门-GitHub项目地址-戳我


建议

如果你对这套封装有任何的问题和建议欢迎加入QQ群告诉我!

以上是关于RxBus-实现EventBus之Sticky的主要内容,如果未能解决你的问题,请参考以下文章

java RxBus替换eventbus

RxBus与RxBus2理解

MVP实战心得—封装Retrofit2.0+RxAndroid+RxBus

EventBus

Android Rxbus事件总线

Android Rxbus事件总线