EventBus -- 源码解析
Posted Y_ZhiWen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EventBus -- 源码解析相关的知识,希望对你有一定的参考价值。
好久没写博客,最近也不是特别忙,但是懒,似乎忘记了初衷。这不行啊,不行,要恢复状态。
假设你已经熟悉EventBus的使用,现在就从EventBus的入口开始。
EventBus的构建
EventBus的属性有点多,先看下构造函数:
public EventBus()
this(DEFAULT_BUILDER);
可以看到,EventBus的配置是根据EventBusBuilder进行配置,以及其他属性
函数太长,最后再梳理一下。
EventBus的注册
源码中可以看到EventBus的注册最终调用这个函数
private synchronized void register(Object subscriber, boolean sticky, int priority)
// ❶
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());
for (SubscriberMethod subscriberMethod : subscriberMethods)
// ❷
subscribe(subscriber, subscriberMethod, sticky, priority);
可以看到三个参数分别是:订阅者、是否注册黏性事件、优先级
- ❶先通过SubscriberMethodFinder查找该订阅者(subscriber)订阅的事件集合
- ❷再通过subscribe方法对订阅者(subscriber)的事件类型进行订阅
❶,so?SubscriberMethodFinder是什么?
其主要的方法是List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass)
。主要作用是根据订阅者的Class对象,查找该订阅者订阅哪些事件,也就是写了几个onEvent..()方法。
主要实现思路是:
- 内部维护
Map<Class<?>, List<SubscriberMethod>> methodCache
集合对订阅者订阅事件进行缓存。先从缓存中取出该订阅者所订阅的事件,若无缓存,则通过反射扫描所有方法进行判断,只获取onEvent
开头并且参数只有一个的方法。扫描完毕则进行缓存并返回。
SubscriberMethod又是什么呢?简单看一下;
final class SubscriberMethod
final Method method; // 订阅方法
final ThreadMode threadMode; // 方法线程模式
final Class<?> eventType; // 订阅参数
.....
}
同时简单看一下ThreadMode,可以看到有四种线程模式:
public enum ThreadMode
PostThread,
MainThread,
BackgroundThread,
Async
❷ subscribe方法是如何对订阅者(subscriber)的事件类型进行订阅的呢?
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority)
// ❶
Class<?> eventType = subscriberMethod.eventType;
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
if (subscriptions == null)
subscriptions = new CopyOnWriteArrayList<Subscription>();
subscriptionsByEventType.put(eventType, subscriptions);
else
if (subscriptions.contains(newSubscription))
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
// Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
// subscriberMethod.method.setAccessible(true);
int size = subscriptions.size();
for (int i = 0; i <= size; i++)
if (i == size || newSubscription.priority > subscriptions.get(i).priority)
subscriptions.add(i, newSubscription);
break;
// ❷
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null)
subscribedEvents = new ArrayList<Class<?>>();
typesBySubscriber.put(subscriber, subscribedEvents);
subscribedEvents.add(eventType);
// ❸
if (sticky)
.....
该方法的简单解析:
Subscription类:
final class Subscription
final Object subscriber;
final SubscriberMethod subscriberMethod;
final int priority;
/**
* Becomes false as soon as @link EventBus#unregister(Object) is called, which is checked by queued event delivery
* @link EventBus#invokeSubscriber(PendingPost) to prevent race conditions.
*/
volatile boolean active;
.....
Map<Class<?>,CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
K:事件类型,V:Subscription集合
Map<Object, List<Class<?>>> typesBySubscriber;
:K:订阅者,V:订阅事件类型集合
该方法的步骤解析:
- ❶通过事件类型获取订阅事件集合(CopyOnWriteArrayList<Subscription> subscriptions
),并根据优先级添加新的订阅事件。
- ❷根据订阅者获取该订阅者的订阅事件类型集合,并添加该订阅事件类型
- ❸黏性事件的处理
上面可以看到EventBus的register过程,接下来是unregister:
如果你能理解上面的Map<Class<?>,CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
和Map<Object, List<Class<?>>> typesBySubscriber;
你应该能够猜出unregister方法是怎么处理的:就是根据订阅者获取订阅的事件类型,删除相关订阅。
EventBus的post
post方法逻辑比较清晰
/** Posts the given event to the event bus. */
public void post(Object event)
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting)
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled)
throw new EventBusException("Internal error. Abort state was not reset");
try
while (!eventQueue.isEmpty())
postSingleEvent(eventQueue.remove(0), postingState);
finally
postingState.isPosting = false;
postingState.isMainThread = false;
currentPostingThreadState
和PostingThreadState
是什么?
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>()
@Override
protected PostingThreadState initialValue()
return new PostingThreadState();
;
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event; // 表示当前正在执行的事件event
boolean canceled;
可以看到每个线程都有各自的PostingThreadState变量,用来表示当前post的状态信息。
可以看到每次post一个event,都会添加到PostingThreadState
中的eventQueue中。并且根据是否正在Posting来调用postSingleEvent
。在postSingleEvent
中判断是否触发父类事件来进行调用postSingleEventForEventType
,其中会根据事件的类型来获取subscriptionsByEventType.get(eventClass)
所有相关订阅,并通过反射来调用最终方法(onEvent)。
EventBus的线程转换和执行机制
定位到EventBus
的postToSubscription
方法
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread)
switch (subscription.subscriberMethod.threadMode)
case PostThread:
invokeSubscriber(subscription, event);
break;
case MainThread:
if (isMainThread)
invokeSubscriber(subscription, event);
else
mainThreadPoster.enqueue(subscription, event);
break;
case BackgroundThread:
if (isMainThread)
backgroundPoster.enqueue(subscription, event);
else
invokeSubscriber(subscription, event);
break;
case Async:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
可以看到通过判断该订阅的线程模式来进行最终方法的线程转换。
如果是在当前线程就直接调用invokeSubscriber
方法,否则则通过mainThreadPoster backgroundPoster asyncPoster
来进行线程切换。
那么是怎么实现的呢?主要看下HandlerPoster,其他的Poster类似
// HandlerPoster
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage)
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
void enqueue(Subscription subscription, Object event)
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this)
queue.enqueue(pendingPost);
if (!handlerActive)
handlerActive = true;
if (!sendMessage(obtainMessage()))
throw new EventBusException("Could not send handler message");
可以看到,HandlerPoster内部维持一个PendingQueue
还有eventbus对象和处理事件最大事件maxMillisInsideHandleMessage
。看到enqueue将订阅封装成PendingPost
并添加到队列,再判断是否在执行当中,如果没执行则发送一条消息去执行。
看一下执行方法:
@Override
public void handleMessage(Message msg)
boolean rescheduled = false;
try
long started = SystemClock.uptimeMillis();
while (true)
PendingPost pendingPost = queue.poll();
if (pendingPost == null)
synchronized (this)
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null)
handlerActive = false;
return;
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage)
if (!sendMessage(obtainMessage()))
throw new EventBusException("Could not send handler message");
rescheduled = true;
return;
finally
handlerActive = rescheduled;
可以看到,如果从队列获取元素为空(这里如果第一次获取为null,再获取一次),则返回;否则则调用eventBus.invokeSubscriber(pendingPost);
执行方法。
最后如果执行在while循环中执行时间超过maxMillisInsideHandleMessage
则再发送一个消息,并退出循环。这样处理主要是避免ANR的出现。
那么maxMillisInsideHandleMessage
是多少呢?在EventBus的构造函数中可以看到mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
BackgroundPoster
类是,需要注意的是在获取订阅的时候是会延迟1000s
获取订阅来调用事件方法。
说道这里,还有一个需要注意的是:PendingPost PendingPostQueue
在PendingPost
内部维持着一个静态的List<PendingPost>
用来回收和分发PendingPost
对象(数量限制在10000个)。
PendingPostQueue
是一个队列,保存着头指针和尾指针,并提供入队和出队的方法。
EventBus的取消机制
先看下官方原话:
Cancelling event delivery
You may cancel the event delivery process by calling cancelEventDelivery(Object event) from a subscriber’s event handling method. Any further event delivery will be cancelled: subsequent subscribers won’t receive the event.
// Called in the same thread (default)
@Subscribe
public void onEvent(MessageEvent event)
// Process the event
…
EventBus.getDefault().cancelEventDelivery(event) ;
Events are usually cancelled by higher priority subscribers. Cancelling is restricted to event handling methods running in posting thread ThreadMode.PostThread.
从该文档我们知道,作者建议我们在优先级高的onEvent
方法调用cancelEventDelivery
。
如果是在onEventMainThread、onEventBackgroundThread、onEventAsync
则会报错。
why??
通过自己写的Demo测试了一下:
在低优先级方法(posting threadmode)cancel,再次post,高低都会收到
在高优先级方法(posting threadmode)cancel,低优先级方法受不懂啊,再次post,高优先级方法收到,低优先级方法收不到
在三个订阅方法(方法线程都是postthread)中优先级第二的方法cancel 事件,再次post 优先级第一和第二的事件还是会接收到消息
在三个订阅方法(方法线程都是postthread)中优先级最高的方法cancel,在子线程post后不会报错,同样优先级低的两个方法都收不到方法
——————————————————
why postThread cancel?
- 要求在优先级最高的postthread拦截,会拦截得比较快,因为优先级最高,当执行优先级最高的方法时调用cancel,避免线程切换消耗的时间而导致的cancel不是最快拦截,所以要求在postthread cancel
how cancel?
当取消的时候会进行一些条件判断,主要有:
!postingState.isPosting – 判断cancel方法是否在posting线程下进行拦截
event == null – 判断拦截事件是否为空。
postingState.event != event 如果当前处理事件不是要拦截事件
postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING – 如果当前EventBus执行事件跟取消事件一致,判断当前事件是否在posting线程下执行,如果不是,则可能说明其可能当前事件正在拦截,当时下一事件已经在延迟了,所以通过抛出异常进行提示
如果上述条件都满足,则设置 postingState.canceled = true;,当执行接收事件的方法被调用后会退出循环,停止后面事件的传递
最后
- EventBus作为项目的事件总线,有好有坏,可以很方便的解耦,很方便的实现线程转换、接口回调和观察者模式。缺点是事件参数只能一个,事件类过多而造成杂乱,通过反射,性能会有点消耗
如果有什么错误,还请指出。
以上是关于EventBus -- 源码解析的主要内容,如果未能解决你的问题,请参考以下文章
Android EventBus源码解析, 带你深入理解EventBus
EventBusEventBus 源码解析 ( EventBus 构建 | EventBus 单例获取 | EventBus 构造函数 | EventBus 构建者 )