EventBus原理以及源代码分析

Posted tony-yang-flutter

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EventBus原理以及源代码分析相关的知识,希望对你有一定的参考价值。

一、概述

  EventBus是一个基于观察者模式的发布/订阅事件总线框架。将事件的发送者和接收者分开,其可以简化组件之间的通讯,相对于BroadcastReceiver其更轻量级也更易用。

二、用法介绍

  EventBus的用法非常的简单,大致上就四步:

  1.注册事件

 @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        /**
         * 注册观察者对象
         */
        EventBus.getDefault().register(this);
    }

  2.消费事件

 /**
     * 用于接收post发送过来的事件
     * @param event 具体的事件主体
     */
    @Subscribe
    public void onMessageEvent(MessageEvent event) {
        switch (event.getArg1()) {
            case 0:
                break;
            case 1:
                break;

        }
    }

  3.取消事件

 @Override
    protected void onDestroy() {
        super.onDestroy();
        /**
         * 取消订阅
         */
        EventBus.getDefault().unregister(this);
    }

  4.发送事件

    /**
     * 发送消息
     */
    public void toPost() {
        MessageEvent event = new MessageEvent();
        event.setArg1(0);
        event.setObj(new Object());
        event.setMsg("我发送消息了");
        EventBus.getDefault().post(event);
    }

  

三、具体的源代码分析

  1.我们从使用开始入手,先看下EventBus.getDefault()都干了写什么事情。

 public static EventBus getDefault() {
        EventBus instance = defaultInstance;
        if (instance == null) {
            synchronized (EventBus.class) {
                instance = EventBus.defaultInstance;
                if (instance == null) {
                    instance = EventBus.defaultInstance = new EventBus();
                }
            }
        }
        return instance;
    }

  从上面的代码我们可以看出EventBus是一个单例类而且是懒汉式的单例,为了保证整个应用中只有一个EventBus实例。

  在其构造方法中会进行一些初始化的工作,如:

public EventBus() {
        this(DEFAULT_BUILDER);
    }

    EventBus(EventBusBuilder builder) {
        logger = builder.getLogger();
        //通过订阅事件类型查找订阅对象的map集合
        subscriptionsByEventType = new HashMap<>();
        typesBySubscriber = new HashMap<>();
        //粘性事件
        stickyEvents = new ConcurrentHashMap<>();
        //主线程支持
        mainThreadSupport = builder.getMainThreadSupport();
        //handler对象
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        //runable对象
        backgroundPoster = new BackgroundPoster(this);
        //线程池
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //订阅者方法集合
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        //默认线程,CachedThreadPool
        executorService = builder.executorService;
    }

  1.subscriptionsByEventType通过event.getClass类型查找HashMap中的subscription集合。而subscription中存放的是subscriberMethod对象,subscriberMethod对象中又存放的是订阅者中用subscriber注解标注的Method、时间类型event.getClass、是否是粘性事件、线程模型、时间执行的优先级等信息。

  2.mainThreadPoster,其实它是HandlerPoster继承了handler并实现了poster,可以把其理解成主线程handler。调用其enqueue方法传入subscription和subscriber。并在其handleMessage方法中通过eventbus.invokeSubscriber(pendingPost) 放订阅方法在主线程中执行

  3.BackgroundPoster实现了runnable和poster对象,通过enqueue方法把subscription和subscriber传入,在run方法中执行eventbus.invokeSubscriber(pendingPost)

  4.AsyncPoster实现了Runnable和Poster接口,也是调用enqueue将Subscription和subscriber传入,并将AsyncPoster对象运行在线程池中,并执行run方法的中的eventBus.invokeSubscriber(pendingPost)

  5.executorService线程池,其具体实现是CachedThreadPool  

  以上的几步在后面的分析中会回使用到。

  2.EventBus.getDefault().register(this)方法

    /**
     * 注册订阅者对象
     * @param subscriber 订阅者
     */
    public void register(Object subscriber) {
        //获取订阅者的class对象
        Class<?> subscriberClass = subscriber.getClass();
        //根据订阅者class对象或者订阅者subscribe标注的方法集合SubscriberMethod
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {//加锁,一次只能进一个线程
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                //开始订阅,循环向subscriptionsByEventType中添加订阅对象。具体的做法是,根据事件类型取出订阅集合,然后向订阅集合中不断的加入
                //订阅,并最终存入subscriptionsByEventType中
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

  在register方法内部,会获取到订阅者subscriber(activity)的class对象。并通过这个class对象获取到该订阅者中用subscriber标记的方法集合。通过SubscriberMethodFinder.findSubscriberMethods(subscriberClass)获取。

  在findSubscriberMethods内部首先会从缓存中获取,如果缓存中又就直接返回,如果缓存中没有就解析subscriberClass对象,并拿到subscriberClass对象中用subscriber注解标注过的方法、方法参数类型、线程模型、事件执行优先级、是否是粘性事件,并将以上信息封装到SubscriberMethod对象中。然后把SubscriberMethod对象加入集合返回。

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //先从缓存中获取,如果获取到了就直接返回。如果缓存中没有就查找带有subscribe标记的方法,并放到集合中
        //因为一个class对象中用subscribe标注的方法可能不止一个,所以要用集合把所有的都存入进去
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //是否强制使用反射(即使有生成的索引)
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

  

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {//如果订阅者对象不为空,执行下面这个while
            findUsingReflectionInSingleClass(findState);
            //去找父类
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            //利用反射获取subscriber的方法集合
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            //过滤public标注的方法
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();//获取参数类型
                //找出只有一个参数的方法
                if (parameterTypes.length == 1) {
                    //找出有subscribe注解的方法
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        //获取事件类型,由于只有一个参数所以parameterTypes取第一个就行了
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            //获取线程模型,这个模型用于标注,用户自定义的事件是在主线程中执行还是在子线程中执行
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            //把订阅者的方法存入subscriberMethod,并把SubscriberMethod存入方法集合中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
             ....省略了一些代码
        }
    }

  然后回过头来看EventBus的findSubscriberMethods方法中的subscriber方法,这个方法是执行订阅操作的。

  synchronized (this) {//加锁,一次只能进一个线程
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                //开始订阅,循环向subscriptionsByEventType中添加订阅对象。具体的做法是,根据事件类型取出订阅集合,然后向订阅集合中不断的加入
                //订阅,并最终存入subscriptionsByEventType中
                subscribe(subscriber, subscriberMethod);
            }
        }

  循环遍历subscriberMethods集合,并把subscriber和SubscriberMethod封装成Subscription。并把Subscription添加到subscriptions集合。并以eventType为key,subscriptions为value将其缓存到名字为subscriptionsByEventType的HashMap中。

 private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        //从subscriberMethod获取事件类型
        Class<?> eventType = subscriberMethod.eventType;
        //将订阅者对象和订阅方法存入Subscription中
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //先从缓存中取出订阅集合(根据事件类型),如果没有就创建一个
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                //向订阅集合中加入订阅
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        ...省略了一些代码
    }

  到此订阅结束。

  总结:

  1.在EventBus中对各个属性进行初始化、如:HashMap、线程池等

  2.获取subscriber的class对象,并通过SubscriberMethodFinder.findSubscriberMethods(subscriberClass)获取到subscriberClass中用subscriber注解标注的方法属性集合(方法、参数类型、线程模型、是否是粘性事件、事件优先级等)。这里面有一个判断条件,先检查缓存中是否有这些方法集合,如果有就直接返回,如果没有就编辑subscriberClass的方法集合选出用subscriber直接标注的方法,并取出方法、参数类型、线程模型、是否是粘性事件、事件优先级等信息,并将其封装为SubscriberMethod对象,然后将SubscriberMethod对象放入List集合返回。

  3.对subscriberMethods结合进行遍历,通过subscribe方法进行订阅。在subscribe方法内部会创建一个Subscription对象,并将subscriber对象和SubscriberMethod方法存入。并将Subscription对象加入到CopyOnWriteArrayList集合subscriptions。并以SubscriberMethod.eventType为key,subscriptions为value将数据存入subscriptionsByEventType的HashMap中。到此订阅就结束了。

 

  2.EventBus.getDefault().post(event)使用post方法发送订阅通知

  

public void post(Object event) {
        //从ThreadLocal中取出当前线程中的状态数据
        PostingThreadState postingState = currentPostingThreadState.get();
        //从当前线程中取出事件队列(其实不是队列,就是一个list集合)
        List<Object> eventQueue = postingState.eventQueue;
        //向当前线程的事件队列中加入事件
        eventQueue.add(event);

        if (!postingState.isPosting) {
            //判断当前线程是不是主线程
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {//如果事件不为空,则对事件进行分发
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

  在post内部会创建一个PostingThreadState对象和一个消息队列eventQueue。其中PostingThreadState是一个ThreadLocal对象(其可以在不同的线程之间存储数据,且线程间的数据相互独立)

  并对postingState进行初始化,紧接着会调用postSingleEvent方法,并将事件和postingState传递进去。在postSingleEvent方法内部又会调用postSingleEventForEventType方法。

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;//订阅集合
        synchronized (this) {
            //根据事件类型从订阅map中取出订阅集合
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            //循环遍历订阅集合
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //根据线程模型对事件进行分发
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

  根据eventClass在subscriptionsByEventType的HashMap中取出Subscriptions集合,循环遍历subscriptions集合并通过postToSubscription方法对事件进行分发

 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {//从订阅对象中取出线程模型
            case POSTING://默认模式,在哪个线程中订阅就在哪个线程中执行
                //具体的分发
                invokeSubscriber(subscription, event);
                break;
            case MAIN://如在主线程(UI线程)发送事件,则直接在主线程处理事件;如果在子线程发送事件,则先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件。
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED://无论在那个线程发送事件,都先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND://如果在主线程发送事件,则先将事件入队列,然后通过线程池依次处理事件;如果在子线程发送事件,则直接在发送事件的线程处理事件
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC://无论在那个线程发送事件,都将事件入队列,然后通过线程池处理
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

  从上面的代码可以看出根据不同的线程模型会执行不同的方法内容。先看下invokeSubscriber(subscription,event)方法都干了啥

 /**
     * 具体执行订阅方法的类
     * @param subscription 订阅对象
     * @param event 事件
     */
    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //从订阅对象中取出SubscriberMehtod,然后调用其Method方法并执行方法的invoke来完成事件的执行
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

  上面方法的执行非常的简单,其实就是从subscription中取出subscriberMethod对象,并取出subscriberMethod对象的method方法并执行method方法的invoke函数。执行完这一段之后在subscriber对象中用subscriber注解标注的方法就会执行了。也就是说到此处未知,从订阅到发送到接收就已经执行完了。

  上面说的是在那个线程中发送就在哪个线程中接收事件的线程模型。线面说下另外两个,无论如何在主线程中执行和无论如何在子线程中执行。主要看两个类HandlerPoster和BackgroundPoster这个两个为代表讲解一下。

  先看HandlerPoster

public class HandlerPoster extends Handler implements Poster {
  .....省略了一些代码
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
               ...省略了一些代码
                eventBus.invokeSubscriber(pendingPost);
                ....省略了一些代码
}

  如果再子线程发送事件,然后想让在在主线程中执行。那么首先就要调用HandlerPoster.enqueue方法,enqueue方法内部会调用sendMessage向主线程Handler发送消息,在Handler的handleMessage内部会接收消息通知,并执行eventBus.invokeSubscriber(pendingPost)。看到这一步其实就明了了,就是通过反射执行方法呗,不同的是执行过程已经切换到主线程了。

  再来看一下BackgroundPoster这个类。

final class BackgroundPoster implements Runnable, Poster {

    ...省略了一些diamante
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                   ...省略了一些代码
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

  从以上的源代码我们可以看出BackgroundPoster实现了Runnable和poster接口,并实现run方法和enqueue方法。在enqueue的内部会调用eventbus的线程池的execute(this)方法运行BackgroundPoster对象的run方法,在run方法内部会调用eventBus.invokeSubscriber(pendingPost)方法,这个方法目的非常明确,通过反射执行subscriber对象中用subscriber注解标注的方法。

  总结:

  1.在post方法内部对postingThreadState进行初始化并赋值

  2.调用postSingleEvent方法,在postSingleEvent方法内部调用postSingleEventForEventType方法,在postSingleEventForEventType方法内部根据eventType从subscriptionsByEventType中取出Subscriptions集合。循环遍历Subscriptions集合,并执行postToSubscription方法。在postToSubscription方法内部会根据不同的现场模型执行对应的方法。如:1.直接执行invokeSubscriber(subscription,eventObj) ,2.通过HandlerPoster.enqueue将执行事件切换到主线程中执行,最终也是会执行invokeSubscriber(pendingPost)方法,3.通过BackgroundPoster.enqueue方法将时间切换到子线程或者线程池中执行,最终会在run方法中执行eventBus.invokeSubscriber(pendingPost)方法。以上三步最终都会调用subscription.subscriberMethod.method.invoke(subscriptions.subscriber,eventObj),完成最终的执行,执行时subscriber注解标注的方法会被执行。

3.我们接下来看下事件的接收,即用subscriber注解标注的方法。其实不用看这个方法,因为这个方法可以自定义只要符合规则就行。主要看一下Subscriber注解的定义和ThreadMode的定义加深对注解和线程模型的理解。

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    //指定订阅事件在哪个线程中运行,默认是POSTING,即在哪个线程中发送就在哪个线程中接收
    ThreadMode threadMode() default ThreadMode.POSTING;
    //粘性事件,默认为false
    boolean sticky() default false;
    //订阅事件执行的优先级
    int priority() default 0;
}

  

/**
 * EventBus线程模型
 */
public enum ThreadMode {
    /**
     * 在哪个线程中发送事件就在哪个线程中接收事件
     */
    POSTING,
    /**
     * 如果实在主线程中发送事件就在主线程中处理,如果再子线程中发送事件就把事件先如队列,然后通过handler切换到主线程中处理
     */
    MAIN,
    /**
     * 无论在主线程还是子线程中发送事件,都将事件先入队列,然后通过Handler切换到主线程,依次处理事件。
     */
    MAIN_ORDERED,
    /**
     * 如果再主线程中发送事件,则将事件入队列,在线程池中依次执行。如果在子线程中发送事件,则直接在发送事件的线程中处理事件。
     */
    BACKGROUND,
    /**
     * 无论是在主线程还是在子线程中发送事件,都把时间入队列,通过线程池依次执行
     */
    ASYNC
}

  不多说,注释非常的清晰了。

 

4.接下来看一下事件是如何取消注册的,即EventBus.getDefault().unRegister(this)

  public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    } 
   private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

  注册这块非常的简单就分两步:1.移除typesBySubscriber集合中的对象。2.通过eventType从subscriptionsByEventType中取出Subscriptions集合,然后找到和subscriber相等的Subscription.subscriber,然后将其从集合中移除掉就行了。

  

  

  

  

  

以上是关于EventBus原理以及源代码分析的主要内容,如果未能解决你的问题,请参考以下文章

EventBus 源码分析

guava eventbus代码分析

由浅入深了解EventBus:

hashmap冲突的解决方法以及原理分析:

guava eventbus代码分析

自定义状态管理Provider以及原理分析