spring源码解析--事件监听机制的使用和原理解析
Posted Lucky帅小武
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring源码解析--事件监听机制的使用和原理解析相关的知识,希望对你有一定的参考价值。
Spring的context包是对于beans包的功能扩展,比如在BeanFactory的基础容器之上扩展为了ApplicationContext上下文。而ApplicationContext处理包含了BeanFactory的全部基础功能之外,还额外提供了大量的扩展功能,本文主要分析下Spring提供的事件监听机制,这里就使用到了设计模式中的观察者设计模式。话不多说,直接正文。
一、Spring事件监听机制的定义
使用过MQ的或者了解观察者设计模式的同学应该大致都了解,实现事件监听机制至少四个核心:事件、事件生产者和事件消费者,另外还需要有一个管理生产者、消费者和事件之间的注册监听关系的控制器。
在Spring中,事件监听机制主要实现是通过事件、事件监听器、事件发布者和事件广播器来实现
1.1、Spring中的事件(ApplicationEvent)
spring中的事件有一个抽象父类ApplicationEvent,该类包含有当前ApplicationContext的引用,这样就可以确认每个事件是从哪一个Spring容器中发生的。
1.2、Spring中的事件监听器(ApplicationListener)
spring中的事件监听器同样有一个顶级接口ApplicationListener,只有一个onApplicationEvent(E event)方法,当该监听器所监听的事件发生时,就会执行该方法
1.3、Spring中的事件发布者(ApplicationEventPublisher)
spring中的事件发布者同样有一个顶级接口ApplicationEventPublisher,只有一个方法publishEvent(Object event)方法,调用该方法就可以发生spring中的事件
1.4、Spring中的事件广播器(ApplicationEventMulticaster)
spring中的事件核心控制器叫做事件广播器,接口为ApplicationEventMulticaster,广播器的作用主要有两个:
作用一:将事件监听器注册到广播器中,这样广播器就知道了每个事件监听器分别监听什么事件,且知道了每个事件对应哪些事件监听器在监听
作用二:将事件广播给事件监听器,当有事件发生时,需要通过广播器来广播给所有的事件监听器,因为生产者只需要关心事件的生产,而不需要关心该事件都被哪些监听器消费。
二、Spring事件监听机制的使用
以电商为例,假设现在有这样一个场景:当用户下单成功之后,此时需要做很多操作,比如需要保存一个订单记录、对应的商品库存需要扣除,假设下单的时候还用到了红包,那么对应的红包也需要改成已经使用。所以相当于一个下单操作,需要进行三个数据更新操作。而这三个操作实际上又是互相没有任何关联的,所以可以通过三个下单事件的监听器分别来处理对应的业务逻辑,此时就可以采用Spring的事件监听机制来模拟实现这样的场景。
1、首先定义一个下单事件 OrderEvent,下单事件中包含了订单号、商品编号和使用的红包编号,代码如下:
1 /** 2 * @Auther: Lucky 3 * @Date: 2020/7/8 下午2:53 4 * @Desc: 自定义下单事件 5 */ 6 public class OrderEvent extends ApplicationEvent { 7 8 /** 订单编号*/ 9 private String orderCode; 10 /** 商品编号*/ 11 private String goodsCode; 12 /** 红包编号*/ 13 private String redPacketCode; 14 15 /** 事件的构造函数*/ 16 public OrderEvent(ApplicationContext source, String orderCode, String goodsCode, String redPacketCode) { 17 super(source); 18 this.orderCode = orderCode; 19 this.goodsCode = goodsCode; 20 this.redPacketCode = redPacketCode; 21 } 22 }
2、分别定义订单监听器、商品监听器和红包监听器分别监听下单事件,分别做对应的处理,代码如下:
1 /** 2 * @Desc: 订单监听器(用于保存订单信息) 3 */ 4 public class OrderListener implements ApplicationListener<OrderEvent> { 5 6 @Override 7 public void onApplicationEvent(OrderEvent event) { 8 System.out.println("订单监听器监听到下单事件,订单号为:" + event.getOrderCode()); 9 //TODO 保存订单处理11 } 12 }
1 /** 2 * @Desc: 商品监听器(用于更新商品的库存) 3 */ 4 public class GoodsListener implements ApplicationListener<OrderEvent> { 5 6 @Override 7 public void onApplicationEvent(OrderEvent event) { 8 System.out.println("商品监听器监听到下单事件,更新商品库存:" + event.getGoodsCode()); 9 //TODO 更新商品库存11 } 12 }
/** * @Desc: 红包监听器(用于使用红包) */ public class RedPacketListener implements ApplicationListener<OrderEvent> { @Override public void onApplicationEvent(OrderEvent event) { if(event.getRedPacketCode()!=null) { System.out.println("红包监听器监听到下单事件,红包编号为:" + event.getRedPacketCode()); //TODO 使用红包处理 }else { System.out.println("订单:"+event.getOrderCode()+"没有使用红包"); } } }
3、事件发布器和事件广播器无需自定义,采用Spring默认的就可以
4、现在事件、事件监听器、事件发布器和事件广播器都已经定义完毕,接下来就需要将事件监听器和事件的监听关系注册到Spring容器中即可,方法很简单,实际就是将事件监听器当作是一个bean注册到Spring容器即可,如下示:
1 <beans> 2 3 <!-- 其他bean --> 4 5 <bean id="orderListener" class="com.lucky.test.spring.event.OrderListener"/> 6 <bean id="goodsListener" class="com.lucky.test.spring.event.GoodsListener"/> 7 <bean id="redPacketListener" class="com.lucky.test.spring.event.RedPacketListener"/> 8 </beans>
5.测试代码如下:
1 public static void main(String[] args){ 2 /** 1.初始化Spring容器 */ 3 ApplicationContext context = new ClassPathXmlApplicationContext("application.xml"); 4 /** 2. 模拟创建下单事件*/ 5 for (int i=0;i < 5;i++){ 6 String orderCode = "test_order_" + i; 7 String goodsCode = "test_order_" + i; 8 String redPacketCode = null; 9 if(i%2==0) { 10 //偶数时使用红包 11 redPacketCode = "test_order_" + i; 12 } 13 OrderEvent orderEvent = new OrderEvent(context, orderCode, goodsCode, redPacketCode); 14 /**3. ApplicationContext实现了ApplicationEventPublisher接口,所以可以直接通过ApplicationContext来发送事件*/ 15 context.publishEvent(orderEvent); 16 } 17 }
测试代码很简单,第一步是先初始化Spring容器,第二步是创建下单事件,第三部就是通过ApplicationContext直接发送事件,此时三个事件监听器就可以监听到下单事件并处理了,测试结果如下:
1 订单监听器监听到下单事件,订单号为:test_order_0 2 商品监听器监听到下单事件,更新商品库存:test_order_0 3 红包监听器监听到下单事件,红包编号为:test_order_0 4 5 订单监听器监听到下单事件,订单号为:test_order_1 6 商品监听器监听到下单事件,更新商品库存:test_order_1 7 订单:test_order_1没有使用红包 8 9 订单监听器监听到下单事件,订单号为:test_order_2 10 商品监听器监听到下单事件,更新商品库存:test_order_2 11 红包监听器监听到下单事件,红包编号为:test_order_2 12 13 订单监听器监听到下单事件,订单号为:test_order_3 14 商品监听器监听到下单事件,更新商品库存:test_order_3 15 订单:test_order_3没有使用红包 16 17 订单监听器监听到下单事件,订单号为:test_order_4 18 商品监听器监听到下单事件,更新商品库存:test_order_4 19 红包监听器监听到下单事件,红包编号为:test_order_4
Tip:Spring的事件监听机制是同步处理的,也就是说生产者发布事件和消费者消费事件是在同一个线程下执行的,所以本案例中的下单事件虽然按三个事件监听器分别监听下单事件,但是总的方法耗时并没有减少,并且如果任何一个监听器抛了异常,同样会影响到其他的监听器,所以每个事件监听器监听消息时必须要对事件进行异常捕获操作,或者内部改成异步处理。
比如将监听器改造如下:
1.在发布事件和消费事件出分别打印当前线程;2.红包监听器不做非空判断,会导致redPacketCode为空时抛异常,测试结果如下:
1 发布事件线程为:main 2 订单监听器监听到下单事件,订单号为:test_order_0线程为:main 3 商品监听器监听到下单事件,更新商品库存:test_order_0线程为:main 4 红包监听器监听到下单事件,更新红包:test_order_0线程为:main 5 发布事件线程为:main 6 订单监听器监听到下单事件,订单号为:test_order_1线程为:main 7 商品监听器监听到下单事件,更新商品库存:test_order_1线程为:main 8 红包监听器监听到下单事件,更新红包:null线程为:main 9 Exception in thread "main" java.lang.NullPointerException 10 at com.lucky.test.spring.event.RedPacketListener.onApplicationEvent(RedPacketListener.java:15) 11 at com.lucky.test.spring.event.RedPacketListener.onApplicationEvent(RedPacketListener.java:10) 12 at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) 13 at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) 14 at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) 15 at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403) 16 at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360) 17 at com.lucky.test.spring.MainTest.main(MainTest.java:29)
从测试结果可以看出,发布事件和所有消费事件的监听器执行的线程都是同一个线程,也就是本案例中的main线程,所以Spring的整个事件监听机制实际上也是同步操作,都是由发布事件的线程来处理的。
而且一旦某一个监听器抛异常了,就相当于整个线程都中止了,不仅后面的监听器无法消费事件,连发布事件的线程也会收到影响。当多个监听器之间的业务逻辑互相不影响时,可以采用优化方案,如下图示:
此时就可以将事件监听器改成异步处理方式,也就是同步接收事件消息,但是处理业务逻辑改成异步处理,比如上例中的订单监听器改造如下:
1 public class OrderListener implements ApplicationListener<OrderEvent> { 2 3 @Override 4 public void onApplicationEvent(OrderEvent event) { 5 /** 通过try/catch保证事件监听器不会抛异常*/ 6 try { 7 new Thread(new Runnable() { 8 @Override 9 public void run() { 10 System.out.println("订单监听器监听到下单事件,订单号为:" + event.getOrderCode() + "线程为:" + Thread.currentThread().getName()); 11 //TODO 保存订单处理 12 } 13 }).start(); 14 }catch (Exception e){ 15 e.printStackTrace(); 16 } 17 } 18 }
通过新建线程的方式去异步执行业务逻辑,或者将业务逻辑交给线程池执行也行。
5.对于同一个事件如果有多个事件监听器时,既然是同步的,那么就必然会有执行顺序的区别,Spring默认的事件执行的执行顺序是按照bean加载的顺序执行的,比如本例中,在XML中配置的顺序是OrderListener->GoodsListener->RedPacketListener,
那么最后执行的顺序就是这个顺序,但是很显然这种隐式的排序方式很容易让开发人员忽视,所以Spring提供了额外的排序方式,就是让监听器实现Ordered接口或者Ordered的子接口PriorityOrdered接口
Ordered接口只有一个方法
/** 最高优先级*/ int HIGHEST_PRECEDENCE = Integer.MIN_VALUE; /** 最低优先级*/ int LOWEST_PRECEDENCE = Integer.MAX_VALUE; /** 获取优先级值,值越小优先级越高*/ int getOrder();
而Ordered接口的子接口PriorityOrdered继承之Ordered接口,但是没有任何实现,也就是说PriorityOrdered的作用仅仅就是一个标识的作用。
在Spring事件监听机制中,优先级是优先按PriorityOrdered排序,然后再按Ordered排序,最终就按bean的加载顺序排序
比如上例中的三个监听器,由于OrderListener是最先加载的,所以默认是最先执行的,此时我们分别在GoodsListener和RedPacketListener实现Ordered接口和PriorityOrdered接口,分别如下:
1 public class GoodsListener implements ApplicationListener<OrderEvent>, Ordered { 2 @Override 3 public int getOrder() { 4 /** 使用最高优先级*/ 5 return Ordered.HIGHEST_PRECEDENCE; 6 } 7 }
1 public class RedPacketListener implements ApplicationListener<OrderEvent>, PriorityOrdered { 2 3 @Override 4 public int getOrder() { 5 /**使用最低优先级*/ 6 return Ordered.LOWEST_PRECEDENCE; 7 } 8 }
这里GoodsListener实现了Ordered接口,优先级为最高优先级;RedPacketListener实现了PriorityOrdered,设置优先级为最低优先级,执行结果如下:
1 红包监听器监听到下单事件,更新红包:test_order_0 2 商品监听器监听到下单事件,更新商品库存:test_order_0 3 订单监听器监听到下单事件,订单号为:test_order_0
可以发现实现了PriorityOrdered接口的RedPacketListener最先执行,实现了Ordered接口的GoodsListener第二个执行,没有实现排序接口的OrderListener接口最后执行。
这里虽然RedPacketListener和GoodsListener都实现了getOrder()方法,并且GoodsListener设置为优先级最高和RedPacketListener的优先级最低,但是还是会先执行RedPacketListener,这就是实现了PriorityOrdered接口的优势。
实现了PriorityOrdered接口的监听器无论优先级值如何都肯定会在实现了Ordered接口的监听器优先执行,这也就是PriorityOrdered接口没有任何实现的原因,这个接口仅仅是为了标记的作用,标记这个监听器是最优先执行的。
三、Spring事件监听机制的实现原理
通过上面的例子,已经了解了事件和事件监听器的处理逻辑,但是还有很多问题需要去探究,比如事件的发布过程、监听器是如何监听消息的等等,此时就需要从ApplicationContext的初始化开始说起了。
当ApplicationContext初始化的时候, 有两个核心步骤和事件监听器有关,一个是初始化事件广播器,一个是注册所有的事件监听器
1 /** 其他流程*/ 2 3 /** 初始化事件广播器*/ 4 initApplicationEventMulticaster(); 5 6 /**其他流程*/ 7 8 /** 注册事件监听器*/ 9 registerListeners(); 10 11 /** 其他流程*/
3.1、初始化事件广播器源码解析
1 /** Spring容器的事件广播器对象*/ 2 private ApplicationEventMulticaster applicationEventMulticaster; 3 4 /** 事件广播器对应的beanName*/ 5 public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster"; 6 7 /** 初始化事件广播器*/ 8 protected void initApplicationEventMulticaster() { 9 //1.获取Spring容器BeanFactory对象 10 ConfigurableListableBeanFactory beanFactory = getBeanFactory(); 11 //2.从BeanFactory获取事件广播器的bean,如果存在说明是用户自定义的事件广播器 12 if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { 13 //2.1.给容器的事件广播器赋值 14 this.applicationEventMulticaster = 15 beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); 16 if (logger.isTraceEnabled()) { 17 logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); 18 } 19 } 20 else { 21 //3.如果没有自定义的,则初始化默认的事件广播器SimpleApplicationEventMulticaster对象 22 this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); 23 //4.注册该bean 24 beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); 25 if (logger.isTraceEnabled()) { 26 logger.trace("No \'" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "\' bean, using " + 27 "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]"); 28 } 29 } 30 }
从源码可以看出,初始化事件广播器的逻辑比较简单,就是先给容器的事件广播器applicationEventMulticaster对象赋值的过程,如果beanFactory中存在用于自定义的就使用自定义的,如果没有自定义的就创建新的默认的事件广播器SimpleApplicationEventMulticaster对象,然后赋值给applicationEventMulticaster对象。
3.2、注册事件监听器源码解析
1 /** 注册事件监听器*/ 2 protected void registerListeners() { 3 //1.遍历将通过编码方式创建的事件监听器加入到事件广播器中 4 for (ApplicationListener<?> listener : getApplicationListeners()) { 5 //2.获取到当前事件广播器,添加事件监听器 6 getApplicationEventMulticaster().addApplicationListener(listener); 7 } 8 9 //3.从BeanFactory中获取所有实现了ApplicationListener接口的bean,遍历加入到事件广播器中 10 String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false); 11 for (String listenerBeanName : listenerBeanNames) { 12 getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName); 13 } 14 15 //3.获取需要提前发布的事件 16 Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents; 17 this.earlyApplicationEvents = null; 18 if (earlyEventsToProcess != null) { 19 for (ApplicationEvent earlyEvent : earlyEventsToProcess) { 20 //5.遍历将提前发布的事件广播出去 21 getApplicationEventMulticaster().multicastEvent(earlyEvent); 22 } 23 }
从源码上看,注册事件监听器的逻辑也不复杂,主要是从容器中找到所有的事件监听器,然后调用事件广播器的addApplicationListener方法将事件监听器添加到事件广播器中,接下来再看下事件广播器添加到逻辑,源码如下:
1 @Override 2 public void addApplicationListener(ApplicationListener<?> listener) { 3 synchronized (this.retrievalMutex) { 4 // 将事件监听器加入的内部的监听器集合applicationListeners中 6 Object singletonTarget = AopProxyUtils.getSingletonTarget(listener); 7 if (singletonTarget instanceof ApplicationListener) { 8 this.defaultRetriever.applicationListeners.remove(singletonTarget); 9 } 10 this.defaultRetriever.applicationListeners.add(listener); 11 this.retrieverCache.clear(); 12 } 13 }
可以看出注册的逻辑比较简单,就是将Listener对象加入到事件广播器内部的集合中保存起来,这样事件广播器就保存了容器中所有的事件监听器了。
3.3、事件的发布和消费原理
事件的发布是通过ApplicationEventPublisher的实现类实现的publishEvent方法实现的,ApplicationContext就实现了该接口,所以使用Spring时就可以直接使用ApplicationContext实例来调用publishEvent方法来发布事件,源码如下:
1 /** 发布事件 2 * @param event:事件对象 3 * */ 4 @Override 5 public void publishEvent(Object event) { 6 publishEvent(event, null); 7 } 8 9 /** 发布事件 10 * @param event:事件对象 11 * @param eventType:事件类型 12 * */ 13 protected void publishEvent(Object event, @Nullable ResolvableType eventType) { 14 Assert.notNull(event, "Event must not be null"); 15 16 /** 1.将发布的事件封装成ApplicationEvent对象(因为传入的参数是Object类型,有可能没有继承ApplicationEvent) */ 17 ApplicationEvent applicationEvent; 18 if (event instanceof ApplicationEvent) { 19 applicationEvent = (ApplicationEvent) event; 20 } 21 else { 22 applicationEvent = new PayloadApplicationEvent<>(this, event); 23 if (eventType == null) { 24 eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType(); 25 } 26 } 27 28 if (this.earlyApplicationEvents != null) { 29 /** 2.1.如果需要提前发布的事件还没有发布完,则不是立即发布,而是将事件加入到待发布集合中*/ 30 this.earlyApplicationEvents.add(applicationEvent); 31 } 32 else { 33 /** 2.2.获取当前的事件广播器,调用multicasterEvent方法广播事件*/ 34 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); 35 } 36 37 /** 3.如果当前applicationContext有父类,则再调用父类的publishEvent方法*/ 38 if (this.parent != null) { 39 if (this.parent instanceof AbstractApplicationContext) { 40 ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); 41 } 42 else { 43 this.parent.publishEvent(event); 44 } 45 } 46 }
其实这里的逻辑也比较简单,首先是将发布的事件转化成ApplicationEvent对象,然后获取到事件广播器,调用事件广播器的multicastEvent方法来广播事件,所以核心逻辑又回到了事件广播器那里
1 /** 广播事件 2 * @param event:事件 3 * @param eventType:事件类型 4 * */ 5 @Override 6 public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { 7 ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); 8 Executor executor = getTaskExecutor();(如果有Executor,则广播事件就是通过异步来处理的) 9 /** 10 * 1.根据事件和类型调用getApplicationListeners方法获取所有监听该事件的监听器 11 * */ 12 for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { 13 if (executor != null) { 14 /** 2. 异步遍历执行invokeListener方法来唤醒监听器处理事件 */ 15 executor.execute(() -> invokeListener(listener, event)); 16 } 17 else { 18 invokeListener(listener, event); 19 } 20 } 21 }
这里主要有两个核心步骤,首先是根据事件和类型找到监听了该事件的所有事件监听器;然后遍历来执行监听器的处理逻辑.另外如果配置了执行器Executor,就会通过Executor来异步发布事件给监听器
1、根据事件获取事件监听器源码如下:
1 protected Collection<ApplicationListener<?>> getApplicationListeners( 2 ApplicationEvent event, ResolvableType eventType) { 3 4 Object source = event.getSource(); 5 Class<?> sourceType = (source != null ? source.getClass() : null); 6 ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType); 7 8 // Quick check for existing entry on ConcurrentHashMap... 9 ListenerRetriever retriever = this.retrieverCache.get(cacheKey); 10 if (retriever != null) { 11 return retriever.getApplicationListeners(); 12 } 13 14 if (this.beanClassLoader == null || 15 (ClassUtils.isCacheSafe(event.getClass(),以上是关于spring源码解析--事件监听机制的使用和原理解析的主要内容,如果未能解决你的问题,请参考以下文章