EventBus原理以及源代码分析
Posted tony-yang-flutter
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EventBus原理以及源代码分析相关的知识,希望对你有一定的参考价值。
一、概述
EventBus是一个基于观察者模式的发布/订阅事件总线框架。将事件的发送者和接收者分开,其可以简化组件之间的通讯,相对于BroadcastReceiver其更轻量级也更易用。
二、用法介绍
EventBus的用法非常的简单,大致上就四步:
1.注册事件
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); /** * 注册观察者对象 */ EventBus.getDefault().register(this); }
2.消费事件
/** * 用于接收post发送过来的事件 * @param event 具体的事件主体 */ @Subscribe public void onMessageEvent(MessageEvent event) { switch (event.getArg1()) { case 0: break; case 1: break; } }
3.取消事件
@Override protected void onDestroy() { super.onDestroy(); /** * 取消订阅 */ EventBus.getDefault().unregister(this); }
4.发送事件
/** * 发送消息 */ public void toPost() { MessageEvent event = new MessageEvent(); event.setArg1(0); event.setObj(new Object()); event.setMsg("我发送消息了"); EventBus.getDefault().post(event); }
三、具体的源代码分析
1.我们从使用开始入手,先看下EventBus.getDefault()都干了写什么事情。
public static EventBus getDefault() { EventBus instance = defaultInstance; if (instance == null) { synchronized (EventBus.class) { instance = EventBus.defaultInstance; if (instance == null) { instance = EventBus.defaultInstance = new EventBus(); } } } return instance; }
从上面的代码我们可以看出EventBus是一个单例类而且是懒汉式的单例,为了保证整个应用中只有一个EventBus实例。
在其构造方法中会进行一些初始化的工作,如:
public EventBus() { this(DEFAULT_BUILDER); } EventBus(EventBusBuilder builder) { logger = builder.getLogger(); //通过订阅事件类型查找订阅对象的map集合 subscriptionsByEventType = new HashMap<>(); typesBySubscriber = new HashMap<>(); //粘性事件 stickyEvents = new ConcurrentHashMap<>(); //主线程支持 mainThreadSupport = builder.getMainThreadSupport(); //handler对象 mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null; //runable对象 backgroundPoster = new BackgroundPoster(this); //线程池 asyncPoster = new AsyncPoster(this); indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0; //订阅者方法集合 subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); logSubscriberExceptions = builder.logSubscriberExceptions; logNoSubscriberMessages = builder.logNoSubscriberMessages; sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; sendNoSubscriberEvent = builder.sendNoSubscriberEvent; throwSubscriberException = builder.throwSubscriberException; eventInheritance = builder.eventInheritance; //默认线程,CachedThreadPool executorService = builder.executorService; }
1.subscriptionsByEventType通过event.getClass类型查找HashMap中的subscription集合。而subscription中存放的是subscriberMethod对象,subscriberMethod对象中又存放的是订阅者中用subscriber注解标注的Method、时间类型event.getClass、是否是粘性事件、线程模型、时间执行的优先级等信息。
2.mainThreadPoster,其实它是HandlerPoster继承了handler并实现了poster,可以把其理解成主线程handler。调用其enqueue方法传入subscription和subscriber。并在其handleMessage方法中通过eventbus.invokeSubscriber(pendingPost) 放订阅方法在主线程中执行
3.BackgroundPoster实现了runnable和poster对象,通过enqueue方法把subscription和subscriber传入,在run方法中执行eventbus.invokeSubscriber(pendingPost)
4.AsyncPoster实现了Runnable和Poster接口,也是调用enqueue将Subscription和subscriber传入,并将AsyncPoster对象运行在线程池中,并执行run方法的中的eventBus.invokeSubscriber(pendingPost)
5.executorService线程池,其具体实现是CachedThreadPool
以上的几步在后面的分析中会回使用到。
2.EventBus.getDefault().register(this)方法
/** * 注册订阅者对象 * @param subscriber 订阅者 */ public void register(Object subscriber) { //获取订阅者的class对象 Class<?> subscriberClass = subscriber.getClass(); //根据订阅者class对象或者订阅者subscribe标注的方法集合SubscriberMethod List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) {//加锁,一次只能进一个线程 for (SubscriberMethod subscriberMethod : subscriberMethods) { //开始订阅,循环向subscriptionsByEventType中添加订阅对象。具体的做法是,根据事件类型取出订阅集合,然后向订阅集合中不断的加入 //订阅,并最终存入subscriptionsByEventType中 subscribe(subscriber, subscriberMethod); } } }
在register方法内部,会获取到订阅者subscriber(activity)的class对象。并通过这个class对象获取到该订阅者中用subscriber标记的方法集合。通过SubscriberMethodFinder.findSubscriberMethods(subscriberClass)获取。
在findSubscriberMethods内部首先会从缓存中获取,如果缓存中又就直接返回,如果缓存中没有就解析subscriberClass对象,并拿到subscriberClass对象中用subscriber注解标注过的方法、方法参数类型、线程模型、事件执行优先级、是否是粘性事件,并将以上信息封装到SubscriberMethod对象中。然后把SubscriberMethod对象加入集合返回。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { //先从缓存中获取,如果获取到了就直接返回。如果缓存中没有就查找带有subscribe标记的方法,并放到集合中 //因为一个class对象中用subscribe标注的方法可能不止一个,所以要用集合把所有的都存入进去 List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } //是否强制使用反射(即使有生成的索引) if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) {//如果订阅者对象不为空,执行下面这个while findUsingReflectionInSingleClass(findState); //去找父类 findState.moveToSuperclass(); } return getMethodsAndRelease(findState); } private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { //利用反射获取subscriber的方法集合 methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } for (Method method : methods) { int modifiers = method.getModifiers(); //过滤public标注的方法 if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes();//获取参数类型 //找出只有一个参数的方法 if (parameterTypes.length == 1) { //找出有subscribe注解的方法 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { //获取事件类型,由于只有一个参数所以parameterTypes取第一个就行了 Class<?> eventType = parameterTypes[0]; if (findState.checkAdd(method, eventType)) { //获取线程模型,这个模型用于标注,用户自定义的事件是在主线程中执行还是在子线程中执行 ThreadMode threadMode = subscribeAnnotation.threadMode(); //把订阅者的方法存入subscriberMethod,并把SubscriberMethod存入方法集合中 findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } ....省略了一些代码 } }
然后回过头来看EventBus的findSubscriberMethods方法中的subscriber方法,这个方法是执行订阅操作的。
synchronized (this) {//加锁,一次只能进一个线程 for (SubscriberMethod subscriberMethod : subscriberMethods) { //开始订阅,循环向subscriptionsByEventType中添加订阅对象。具体的做法是,根据事件类型取出订阅集合,然后向订阅集合中不断的加入 //订阅,并最终存入subscriptionsByEventType中 subscribe(subscriber, subscriberMethod); } }
循环遍历subscriberMethods集合,并把subscriber和SubscriberMethod封装成Subscription。并把Subscription添加到subscriptions集合。并以eventType为key,subscriptions为value将其缓存到名字为subscriptionsByEventType的HashMap中。
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { //从subscriberMethod获取事件类型 Class<?> eventType = subscriberMethod.eventType; //将订阅者对象和订阅方法存入Subscription中 Subscription newSubscription = new Subscription(subscriber, subscriberMethod); //先从缓存中取出订阅集合(根据事件类型),如果没有就创建一个 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { //向订阅集合中加入订阅 subscriptions.add(i, newSubscription); break; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); ...省略了一些代码 }
到此订阅结束。
总结:
1.在EventBus中对各个属性进行初始化、如:HashMap、线程池等
2.获取subscriber的class对象,并通过SubscriberMethodFinder.findSubscriberMethods(subscriberClass)获取到subscriberClass中用subscriber注解标注的方法属性集合(方法、参数类型、线程模型、是否是粘性事件、事件优先级等)。这里面有一个判断条件,先检查缓存中是否有这些方法集合,如果有就直接返回,如果没有就编辑subscriberClass的方法集合选出用subscriber直接标注的方法,并取出方法、参数类型、线程模型、是否是粘性事件、事件优先级等信息,并将其封装为SubscriberMethod对象,然后将SubscriberMethod对象放入List集合返回。
3.对subscriberMethods结合进行遍历,通过subscribe方法进行订阅。在subscribe方法内部会创建一个Subscription对象,并将subscriber对象和SubscriberMethod方法存入。并将Subscription对象加入到CopyOnWriteArrayList集合subscriptions。并以SubscriberMethod.eventType为key,subscriptions为value将数据存入subscriptionsByEventType的HashMap中。到此订阅就结束了。
2.EventBus.getDefault().post(event)使用post方法发送订阅通知
public void post(Object event) { //从ThreadLocal中取出当前线程中的状态数据 PostingThreadState postingState = currentPostingThreadState.get(); //从当前线程中取出事件队列(其实不是队列,就是一个list集合) List<Object> eventQueue = postingState.eventQueue; //向当前线程的事件队列中加入事件 eventQueue.add(event); if (!postingState.isPosting) { //判断当前线程是不是主线程 postingState.isMainThread = isMainThread(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) {//如果事件不为空,则对事件进行分发 postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
在post内部会创建一个PostingThreadState对象和一个消息队列eventQueue。其中PostingThreadState是一个ThreadLocal对象(其可以在不同的线程之间存储数据,且线程间的数据相互独立)
并对postingState进行初始化,紧接着会调用postSingleEvent方法,并将事件和postingState传递进去。在postSingleEvent方法内部又会调用postSingleEventForEventType方法。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions;//订阅集合 synchronized (this) { //根据事件类型从订阅map中取出订阅集合 subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { //循环遍历订阅集合 for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { //根据线程模型对事件进行分发 postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
根据eventClass在subscriptionsByEventType的HashMap中取出Subscriptions集合,循环遍历subscriptions集合并通过postToSubscription方法对事件进行分发
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) {//从订阅对象中取出线程模型 case POSTING://默认模式,在哪个线程中订阅就在哪个线程中执行 //具体的分发 invokeSubscriber(subscription, event); break; case MAIN://如在主线程(UI线程)发送事件,则直接在主线程处理事件;如果在子线程发送事件,则先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件。 if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case MAIN_ORDERED://无论在那个线程发送事件,都先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件 if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { // temporary: technically not correct as poster not decoupled from subscriber invokeSubscriber(subscription, event); } break; case BACKGROUND://如果在主线程发送事件,则先将事件入队列,然后通过线程池依次处理事件;如果在子线程发送事件,则直接在发送事件的线程处理事件 if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC://无论在那个线程发送事件,都将事件入队列,然后通过线程池处理 asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
从上面的代码可以看出根据不同的线程模型会执行不同的方法内容。先看下invokeSubscriber(subscription,event)方法都干了啥
/** * 具体执行订阅方法的类 * @param subscription 订阅对象 * @param event 事件 */ void invokeSubscriber(Subscription subscription, Object event) { try { //从订阅对象中取出SubscriberMehtod,然后调用其Method方法并执行方法的invoke来完成事件的执行 subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
上面方法的执行非常的简单,其实就是从subscription中取出subscriberMethod对象,并取出subscriberMethod对象的method方法并执行method方法的invoke函数。执行完这一段之后在subscriber对象中用subscriber注解标注的方法就会执行了。也就是说到此处未知,从订阅到发送到接收就已经执行完了。
上面说的是在那个线程中发送就在哪个线程中接收事件的线程模型。线面说下另外两个,无论如何在主线程中执行和无论如何在子线程中执行。主要看两个类HandlerPoster和BackgroundPoster这个两个为代表讲解一下。
先看HandlerPoster
public class HandlerPoster extends Handler implements Poster { .....省略了一些代码 public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } } @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { ...省略了一些代码 eventBus.invokeSubscriber(pendingPost); ....省略了一些代码 }
如果再子线程发送事件,然后想让在在主线程中执行。那么首先就要调用HandlerPoster.enqueue方法,enqueue方法内部会调用sendMessage向主线程Handler发送消息,在Handler的handleMessage内部会接收消息通知,并执行eventBus.invokeSubscriber(pendingPost)。看到这一步其实就明了了,就是通过反射执行方法呗,不同的是执行过程已经切换到主线程了。
再来看一下BackgroundPoster这个类。
final class BackgroundPoster implements Runnable, Poster { ...省略了一些diamante public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { ...省略了一些代码 eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } } }
从以上的源代码我们可以看出BackgroundPoster实现了Runnable和poster接口,并实现run方法和enqueue方法。在enqueue的内部会调用eventbus的线程池的execute(this)方法运行BackgroundPoster对象的run方法,在run方法内部会调用eventBus.invokeSubscriber(pendingPost)方法,这个方法目的非常明确,通过反射执行subscriber对象中用subscriber注解标注的方法。
总结:
1.在post方法内部对postingThreadState进行初始化并赋值
2.调用postSingleEvent方法,在postSingleEvent方法内部调用postSingleEventForEventType方法,在postSingleEventForEventType方法内部根据eventType从subscriptionsByEventType中取出Subscriptions集合。循环遍历Subscriptions集合,并执行postToSubscription方法。在postToSubscription方法内部会根据不同的现场模型执行对应的方法。如:1.直接执行invokeSubscriber(subscription,eventObj) ,2.通过HandlerPoster.enqueue将执行事件切换到主线程中执行,最终也是会执行invokeSubscriber(pendingPost)方法,3.通过BackgroundPoster.enqueue方法将时间切换到子线程或者线程池中执行,最终会在run方法中执行eventBus.invokeSubscriber(pendingPost)方法。以上三步最终都会调用subscription.subscriberMethod.method.invoke(subscriptions.subscriber,eventObj),完成最终的执行,执行时subscriber注解标注的方法会被执行。
3.我们接下来看下事件的接收,即用subscriber注解标注的方法。其实不用看这个方法,因为这个方法可以自定义只要符合规则就行。主要看一下Subscriber注解的定义和ThreadMode的定义加深对注解和线程模型的理解。
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface Subscribe { //指定订阅事件在哪个线程中运行,默认是POSTING,即在哪个线程中发送就在哪个线程中接收 ThreadMode threadMode() default ThreadMode.POSTING; //粘性事件,默认为false boolean sticky() default false; //订阅事件执行的优先级 int priority() default 0; }
/** * EventBus线程模型 */ public enum ThreadMode { /** * 在哪个线程中发送事件就在哪个线程中接收事件 */ POSTING, /** * 如果实在主线程中发送事件就在主线程中处理,如果再子线程中发送事件就把事件先如队列,然后通过handler切换到主线程中处理 */ MAIN, /** * 无论在主线程还是子线程中发送事件,都将事件先入队列,然后通过Handler切换到主线程,依次处理事件。 */ MAIN_ORDERED, /** * 如果再主线程中发送事件,则将事件入队列,在线程池中依次执行。如果在子线程中发送事件,则直接在发送事件的线程中处理事件。 */ BACKGROUND, /** * 无论是在主线程还是在子线程中发送事件,都把时间入队列,通过线程池依次执行 */ ASYNC }
不多说,注释非常的清晰了。
4.接下来看一下事件是如何取消注册的,即EventBus.getDefault().unRegister(this)
public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } typesBySubscriber.remove(subscriber); } else { logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } }
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false; subscriptions.remove(i); i--; size--; } } } }
注册这块非常的简单就分两步:1.移除typesBySubscriber集合中的对象。2.通过eventType从subscriptionsByEventType中取出Subscriptions集合,然后找到和subscriber相等的Subscription.subscriber,然后将其从集合中移除掉就行了。
以上是关于EventBus原理以及源代码分析的主要内容,如果未能解决你的问题,请参考以下文章