EventBus -- 源码解析

Posted Y_ZhiWen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EventBus -- 源码解析相关的知识,希望对你有一定的参考价值。

好久没写博客,最近也不是特别忙,但是懒,似乎忘记了初衷。这不行啊,不行,要恢复状态。

假设你已经熟悉EventBus的使用,现在就从EventBus的入口开始。

EventBus的构建

EventBus的属性有点多,先看下构造函数:

public EventBus() 
        this(DEFAULT_BUILDER);

可以看到,EventBus的配置是根据EventBusBuilder进行配置,以及其他属性

函数太长,最后再梳理一下。

EventBus的注册

源码中可以看到EventBus的注册最终调用这个函数

private synchronized void register(Object subscriber, boolean sticky, int priority) 
        // ❶
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());
        for (SubscriberMethod subscriberMethod : subscriberMethods) 
            // ❷
            subscribe(subscriber, subscriberMethod, sticky, priority);
        
    

可以看到三个参数分别是:订阅者、是否注册黏性事件、优先级

  • ❶先通过SubscriberMethodFinder查找该订阅者(subscriber)订阅的事件集合
  • ❷再通过subscribe方法对订阅者(subscriber)的事件类型进行订阅

❶,so?SubscriberMethodFinder是什么?

其主要的方法是List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass)。主要作用是根据订阅者的Class对象,查找该订阅者订阅哪些事件,也就是写了几个onEvent..()方法。

主要实现思路是:

  • 内部维护Map<Class<?>, List<SubscriberMethod>> methodCache集合对订阅者订阅事件进行缓存。先从缓存中取出该订阅者所订阅的事件,若无缓存,则通过反射扫描所有方法进行判断,只获取onEvent开头并且参数只有一个的方法。扫描完毕则进行缓存并返回。

SubscriberMethod又是什么呢?简单看一下;

final class SubscriberMethod 
    final Method method;  // 订阅方法
    final ThreadMode threadMode; // 方法线程模式
    final Class<?> eventType; // 订阅参数
    .....
}

同时简单看一下ThreadMode,可以看到有四种线程模式:

public enum ThreadMode 
    PostThread,
    MainThread,
    BackgroundThread,
    Async

subscribe方法是如何对订阅者(subscriber)的事件类型进行订阅的呢?

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) 
        // ❶
        Class<?> eventType = subscriberMethod.eventType;
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
        if (subscriptions == null) 
            subscriptions = new CopyOnWriteArrayList<Subscription>();
            subscriptionsByEventType.put(eventType, subscriptions);
         else 
            if (subscriptions.contains(newSubscription)) 
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            
        

        // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
        // subscriberMethod.method.setAccessible(true);

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) 
            if (i == size || newSubscription.priority > subscriptions.get(i).priority) 
                subscriptions.add(i, newSubscription);
                break;
            
        

        // ❷
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) 
            subscribedEvents = new ArrayList<Class<?>>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        
        subscribedEvents.add(eventType);

        // ❸
        if (sticky) 
            .....
        
    

该方法的简单解析:

Subscription类:

final class Subscription 
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
    final int priority;
    /**
     * Becomes false as soon as @link EventBus#unregister(Object) is called, which is checked by queued event delivery
     * @link EventBus#invokeSubscriber(PendingPost) to prevent race conditions.
     */
    volatile boolean active;
    .....

Map<Class<?>,CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;K:事件类型,V:Subscription集合

Map<Object, List<Class<?>>> typesBySubscriber;:K:订阅者,V:订阅事件类型集合

该方法的步骤解析:
- ❶通过事件类型获取订阅事件集合(CopyOnWriteArrayList<Subscription> subscriptions),并根据优先级添加新的订阅事件。
- ❷根据订阅者获取该订阅者的订阅事件类型集合,并添加该订阅事件类型
- ❸黏性事件的处理

上面可以看到EventBus的register过程,接下来是unregister:

如果你能理解上面的Map<Class<?>,CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;Map<Object, List<Class<?>>> typesBySubscriber;

你应该能够猜出unregister方法是怎么处理的:就是根据订阅者获取订阅的事件类型,删除相关订阅。

EventBus的post

post方法逻辑比较清晰

/** Posts the given event to the event bus. */
    public void post(Object event) 
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) 
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) 
                throw new EventBusException("Internal error. Abort state was not reset");
            
            try 
                while (!eventQueue.isEmpty()) 
                    postSingleEvent(eventQueue.remove(0), postingState);
                
             finally 
                postingState.isPosting = false;
                postingState.isMainThread = false;
            
        
    

currentPostingThreadStatePostingThreadState是什么?

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() 
        @Override
        protected PostingThreadState initialValue() 
            return new PostingThreadState();
        
    ;

/** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState 
        final List<Object> eventQueue = new ArrayList<Object>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event; // 表示当前正在执行的事件event
        boolean canceled;
    

可以看到每个线程都有各自的PostingThreadState变量,用来表示当前post的状态信息。

可以看到每次post一个event,都会添加到PostingThreadState中的eventQueue中。并且根据是否正在Posting来调用postSingleEvent。在postSingleEvent中判断是否触发父类事件来进行调用postSingleEventForEventType,其中会根据事件的类型来获取subscriptionsByEventType.get(eventClass)所有相关订阅,并通过反射来调用最终方法(onEvent)。

EventBus的线程转换和执行机制

定位到EventBuspostToSubscription方法

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) 
        switch (subscription.subscriberMethod.threadMode) 
            case PostThread:
                invokeSubscriber(subscription, event);
                break;
            case MainThread:
                if (isMainThread) 
                    invokeSubscriber(subscription, event);
                 else 
                    mainThreadPoster.enqueue(subscription, event);
                
                break;
            case BackgroundThread:
                if (isMainThread) 
                    backgroundPoster.enqueue(subscription, event);
                 else 
                    invokeSubscriber(subscription, event);
                
                break;
            case Async:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        
    

可以看到通过判断该订阅的线程模式来进行最终方法的线程转换。

如果是在当前线程就直接调用invokeSubscriber方法,否则则通过mainThreadPoster backgroundPoster asyncPoster来进行线程切换。

那么是怎么实现的呢?主要看下HandlerPoster,其他的Poster类似

// HandlerPoster
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) 
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    

void enqueue(Subscription subscription, Object event) 
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) 
            queue.enqueue(pendingPost);
            if (!handlerActive) 
                handlerActive = true;
                if (!sendMessage(obtainMessage())) 
                    throw new EventBusException("Could not send handler message");
                
            
        
    

可以看到,HandlerPoster内部维持一个PendingQueue还有eventbus对象和处理事件最大事件maxMillisInsideHandleMessage。看到enqueue将订阅封装成PendingPost并添加到队列,再判断是否在执行当中,如果没执行则发送一条消息去执行。

看一下执行方法:

@Override
    public void handleMessage(Message msg) 
        boolean rescheduled = false;
        try 
            long started = SystemClock.uptimeMillis();
            while (true) 
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) 
                    synchronized (this) 
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) 
                            handlerActive = false;
                            return;
                        
                    
                
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) 
                    if (!sendMessage(obtainMessage())) 
                        throw new EventBusException("Could not send handler message");
                    
                    rescheduled = true;
                    return;
                
            
         finally 
            handlerActive = rescheduled;
        
    

可以看到,如果从队列获取元素为空(这里如果第一次获取为null,再获取一次),则返回;否则则调用eventBus.invokeSubscriber(pendingPost);执行方法。
最后如果执行在while循环中执行时间超过maxMillisInsideHandleMessage则再发送一个消息,并退出循环。这样处理主要是避免ANR的出现。

那么maxMillisInsideHandleMessage是多少呢?在EventBus的构造函数中可以看到mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

BackgroundPoster类是,需要注意的是在获取订阅的时候是会延迟1000s获取订阅来调用事件方法

说道这里,还有一个需要注意的是:PendingPost PendingPostQueue

PendingPost内部维持着一个静态的List<PendingPost>用来回收和分发PendingPost对象(数量限制在10000个)。

PendingPostQueue是一个队列,保存着头指针和尾指针,并提供入队和出队的方法。

EventBus的取消机制

先看下官方原话:

Cancelling event delivery

You may cancel the event delivery process by calling cancelEventDelivery(Object event) from a subscriber’s event handling method. Any further event delivery will be cancelled: subsequent subscribers won’t receive the event.

// Called in the same thread (default)
@Subscribe
public void onEvent(MessageEvent event)
// Process the event
…

EventBus.getDefault().cancelEventDelivery(event) ;

Events are usually cancelled by higher priority subscribers. Cancelling is restricted to event handling methods running in posting thread ThreadMode.PostThread.

从该文档我们知道,作者建议我们在优先级高的onEvent方法调用cancelEventDelivery

如果是在onEventMainThread、onEventBackgroundThread、onEventAsync则会报错。

why??

通过自己写的Demo测试了一下:

  • 在低优先级方法(posting threadmode)cancel,再次post,高低都会收到

  • 在高优先级方法(posting threadmode)cancel,低优先级方法受不懂啊,再次post,高优先级方法收到,低优先级方法收不到

  • 在三个订阅方法(方法线程都是postthread)中优先级第二的方法cancel 事件,再次post 优先级第一和第二的事件还是会接收到消息

  • 在三个订阅方法(方法线程都是postthread)中优先级最高的方法cancel,在子线程post后不会报错,同样优先级低的两个方法都收不到方法

——————————————————

why postThread cancel?

  • 要求在优先级最高的postthread拦截,会拦截得比较快,因为优先级最高,当执行优先级最高的方法时调用cancel,避免线程切换消耗的时间而导致的cancel不是最快拦截,所以要求在postthread cancel

how cancel?

当取消的时候会进行一些条件判断,主要有:

  • !postingState.isPosting – 判断cancel方法是否在posting线程下进行拦截

  • event == null – 判断拦截事件是否为空。

  • postingState.event != event 如果当前处理事件不是要拦截事件

  • postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING – 如果当前EventBus执行事件跟取消事件一致,判断当前事件是否在posting线程下执行,如果不是,则可能说明其可能当前事件正在拦截,当时下一事件已经在延迟了,所以通过抛出异常进行提示

如果上述条件都满足,则设置 postingState.canceled = true;,当执行接收事件的方法被调用后会退出循环,停止后面事件的传递

最后

  • EventBus作为项目的事件总线,有好有坏,可以很方便的解耦,很方便的实现线程转换、接口回调和观察者模式。缺点是事件参数只能一个,事件类过多而造成杂乱,通过反射,性能会有点消耗

如果有什么错误,还请指出。

以上是关于EventBus -- 源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Android EventBus源码解析, 带你深入理解EventBus

源码解析-EventBus

andorid jar/库源码解析 EventBus

EventBusEventBus 源码解析 ( EventBus 构建 | EventBus 单例获取 | EventBus 构造函数 | EventBus 构建者 )

EventBus3.0源码解析——06. 总结

EventBus源码解析