guava eventbus代码分析

Posted xuqiyu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了guava eventbus代码分析相关的知识,希望对你有一定的参考价值。

---恢复内容开始---

我们分析下EventBus的核心方法 post方法,直接贴代码

1 public void post(Object event) {
2     Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
3     if (eventSubscribers.hasNext()) {
4       dispatcher.dispatch(event, eventSubscribers);
5     } else if (!(event instanceof DeadEvent)) {
6       // the event had no subscribers and was not itself a DeadEvent
7       post(new DeadEvent(this, event));
8     }
9   }

第2行,从subscribers中得到一个事件的所有监听者subscriber列表,第4行,调用dispatcher的dispatch方法通知所有的监听者,

第7行,如果当前事件没有任何监听者,则发送一个DeadEvent用于记录日志。

我们跟进第4行的dispatch方法:

dispatcher方法有3个实现类,主要用到的是后面两个,LegacyAsyncDispatcher,这是一个多线程分发器,是配合 AsyncEventBus 使用的;PerThreadQueuedDispather,是一个单线程分发器。

其中,这个单线程分发器实际上和当前eventBus 的post方法用的是同一个线程(并没有启用新的线程),而且单线程分发器能保证事件的执行顺序和post的顺序完全一致(即广度优先遍历)

那么按照顺序来看一下这两个分发器的dispatch 方法,

LegacyAsyncDispatcher的dispatch方法:

 1 @Override
 2     void dispatch(Object event, Iterator<Subscriber> subscribers) {
 3       checkNotNull(event);
 4       while (subscribers.hasNext()) {
 5         queue.add(new EventWithSubscriber(event, subscribers.next()));
 6       }
 7 
 8       EventWithSubscriber e;
 9       while ((e = queue.poll()) != null) {
10         e.subscriber.dispatchEvent(e.event);
11       }
12     }

第3行,checkNotNull方法 import static 方式引入的,这种校验参数的写法被很多开发者使用

接下来是两个串行的while循环

第4-6行,一个ConcurrentLinkedQueue线程安全队列,把 EventWithSubscriber 往这个队列里面扔

第9-11行,从这个队列里面取数据,然后调用subscriber的dispatchEvent方法

这个地方为什么搞1个队列,而不是直接dispatchEvent呢?

 我看了一下这里,这个队列的作用能够使不同线程 “比较均衡的 从队列里面取数据进行分发”,我举个例子,有一个线程A post一个时间对象,

这个对象只有两个监听者 subcriber,另一线程B post另外一个对象,有100个subscriber, 这两个线程如果并发进行,总计往队列里面放了102个 EventWithSubscriber

虽然A线程里面的subcriber少,但是由于俩线程同时从队列里面取数据,着俩线程会在相近的时刻执行完整个dispatch方法,所以施一种均衡策略,

可能有深层次的原因用这个队列,我暂时看不出来。

然后我们跟进第10行, 看下subscriber的 dispatchEvent方法:

 1 /**
 2    * Dispatches {@code event} to this subscriber using the proper executor.
 3    */
 4   final void dispatchEvent(final Object event) {
 5     executor.execute(new Runnable() {
 6       @Override
 7       public void run() {
 8         try {
 9           invokeSubscriberMethod(event);
10         } catch (InvocationTargetException e) {
11           bus.handleSubscriberException(e.getCause(), context(event));
12         }
13       }
14     });
15   }

就是使用线程池多线程来执行订阅者的方法,AsyncEventBus 使用的线程池是用户自定义的线程池,一般使用 jdk Executors工具类里面创建的ExecutorService作为线程池

再看一下上面代码第9行,执行订阅者的方法,一直跟进到 Subscriber里面的代码:

 1 @VisibleForTesting
 2   void invokeSubscriberMethod(Object event) throws InvocationTargetException {
 3     try {
 4       method.invoke(target, checkNotNull(event));
 5     } catch (IllegalArgumentException e) {
 6       throw new Error("Method rejected target/argument: " + event, e);
 7     } catch (IllegalAccessException e) {
 8       throw new Error("Method became inaccessible: " + event, e);
 9     } catch (InvocationTargetException e) {
10       if (e.getCause() instanceof Error) {
11         throw (Error) e.getCause();
12       }
13       throw e;
14     }
15   }

第4行,就是反射调用,method 和 target都是 在执行EventBus的post的方法的时候,new Subscriber的时候作为构造参数传入的

结束,下一篇我们再来看单线程的PerThreadQueuedDispather

以上是关于guava eventbus代码分析的主要内容,如果未能解决你的问题,请参考以下文章

Guava中EventBus分析

guava中EventBus(事件总线)源码分析与使用

优雅代码14-guava精选方法及eventBus观察者模式源码解析

Guava java EventBus实例化错误

Guava学习笔记:EventBus(转)

guava中eventbus注解使用