guava eventbus代码分析
Posted xuqiyu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了guava eventbus代码分析相关的知识,希望对你有一定的参考价值。
---恢复内容开始---
我们分析下EventBus的核心方法 post方法,直接贴代码
1 public void post(Object event) { 2 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 3 if (eventSubscribers.hasNext()) { 4 dispatcher.dispatch(event, eventSubscribers); 5 } else if (!(event instanceof DeadEvent)) { 6 // the event had no subscribers and was not itself a DeadEvent 7 post(new DeadEvent(this, event)); 8 } 9 }
第2行,从subscribers中得到一个事件的所有监听者subscriber列表,第4行,调用dispatcher的dispatch方法通知所有的监听者,
第7行,如果当前事件没有任何监听者,则发送一个DeadEvent用于记录日志。
我们跟进第4行的dispatch方法:
dispatcher方法有3个实现类,主要用到的是后面两个,LegacyAsyncDispatcher,这是一个多线程分发器,是配合 AsyncEventBus 使用的;PerThreadQueuedDispather,是一个单线程分发器。
其中,这个单线程分发器实际上和当前eventBus 的post方法用的是同一个线程(并没有启用新的线程),而且单线程分发器能保证事件的执行顺序和post的顺序完全一致(即广度优先遍历)
那么按照顺序来看一下这两个分发器的dispatch 方法,
LegacyAsyncDispatcher的dispatch方法:
1 @Override 2 void dispatch(Object event, Iterator<Subscriber> subscribers) { 3 checkNotNull(event); 4 while (subscribers.hasNext()) { 5 queue.add(new EventWithSubscriber(event, subscribers.next())); 6 } 7 8 EventWithSubscriber e; 9 while ((e = queue.poll()) != null) { 10 e.subscriber.dispatchEvent(e.event); 11 } 12 }
第3行,checkNotNull方法 import static 方式引入的,这种校验参数的写法被很多开发者使用
接下来是两个串行的while循环
第4-6行,一个ConcurrentLinkedQueue线程安全队列,把 EventWithSubscriber 往这个队列里面扔
第9-11行,从这个队列里面取数据,然后调用subscriber的dispatchEvent方法
这个地方为什么搞1个队列,而不是直接dispatchEvent呢?
我看了一下这里,这个队列的作用能够使不同线程 “比较均衡的 从队列里面取数据进行分发”,我举个例子,有一个线程A post一个时间对象,
这个对象只有两个监听者 subcriber,另一线程B post另外一个对象,有100个subscriber, 这两个线程如果并发进行,总计往队列里面放了102个 EventWithSubscriber
虽然A线程里面的subcriber少,但是由于俩线程同时从队列里面取数据,着俩线程会在相近的时刻执行完整个dispatch方法,所以施一种均衡策略,
可能有深层次的原因用这个队列,我暂时看不出来。
然后我们跟进第10行, 看下subscriber的 dispatchEvent方法:
1 /** 2 * Dispatches {@code event} to this subscriber using the proper executor. 3 */ 4 final void dispatchEvent(final Object event) { 5 executor.execute(new Runnable() { 6 @Override 7 public void run() { 8 try { 9 invokeSubscriberMethod(event); 10 } catch (InvocationTargetException e) { 11 bus.handleSubscriberException(e.getCause(), context(event)); 12 } 13 } 14 }); 15 }
就是使用线程池多线程来执行订阅者的方法,AsyncEventBus 使用的线程池是用户自定义的线程池,一般使用 jdk Executors工具类里面创建的ExecutorService作为线程池
再看一下上面代码第9行,执行订阅者的方法,一直跟进到 Subscriber里面的代码:
1 @VisibleForTesting 2 void invokeSubscriberMethod(Object event) throws InvocationTargetException { 3 try { 4 method.invoke(target, checkNotNull(event)); 5 } catch (IllegalArgumentException e) { 6 throw new Error("Method rejected target/argument: " + event, e); 7 } catch (IllegalAccessException e) { 8 throw new Error("Method became inaccessible: " + event, e); 9 } catch (InvocationTargetException e) { 10 if (e.getCause() instanceof Error) { 11 throw (Error) e.getCause(); 12 } 13 throw e; 14 } 15 }
第4行,就是反射调用,method 和 target都是 在执行EventBus的post的方法的时候,new Subscriber的时候作为构造参数传入的
结束,下一篇我们再来看单线程的PerThreadQueuedDispather
以上是关于guava eventbus代码分析的主要内容,如果未能解决你的问题,请参考以下文章