Spring广播与监听
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring广播与监听相关的知识,希望对你有一定的参考价值。
参考技术A 发送消息就是通过publishEvent方法进行发送,具体的实现在AbstractApplicationContext中,可以看到广播是通过getApplicationEventMulticaster()方法得到的applicationEventMulticaster这个成员变量来进行的,他是一个ApplicationEventMulticaster接口,最终的multicastEvent调用的是SimpleApplicationEventMulticaster这个实例里的multicastEvent方法,
再点进invokeListener看看,
发现最终执行的是ApplicationListener接口的onApplicationEvent方法
现在我们知道了Spring的广播和监听的流程,可以自定义自己的广播和监听器了,首先编写一个自定义事件,
然后自定义自己的广播器,将Spring初始化过程中生成的广播器注入进来,
接着自定义自己的监听器,
最后写一个controller模拟触发,
启动容器,浏览器输入http://localhost:8080/publish,查看打印结果,
至此,Spring的广播和监听基本就结束了,但是,我们可能会有一个小疑问,Spring的事件和监听器是如何绑定的呢?
答案其实是在我们上面看到的广播事件multicastEvent里,
这里,他根据事件和事件类型去获取所有的监听器,点进去看看,
可以发现,Spring是将事件和监听器通过一个并发集合来保存的,将事件类型和来源包装成了一个内部类作为key,监听器集合包装成另一个内部类作为value,在Spring容器初始化的时候会把所有的对应关系都注入到这个集合中
ActiveMQ消息队列技术融合Spring
一、业务逻辑
我想在修改一个物品的状态时,同时发送广播,给对应的监听器去实现,此商品存储到solr中,同时通过网页静态模板生成一个当前物品的详情页面,此时用到了广播机制
当我删除一个商品时,发送一个广播,给对应的监听器,同时删除solr中对应的物品。
广播机制:必须要同时在线,才能接收我的消息
使用消息中间件需要导入配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.200.128:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 发布订阅模式, 商品导入索引库和生成静态页面 --> <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!--将商品上架所有的商品的id发送到这个队列中--> <constructor-arg value="youlexuan_topic_page_solr"/> </bean> <!-- 点对点模式--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!--将商品上架所有的商品的id发送到这个队列中--> <constructor-arg value="youlexuan_queue_solr_delete"/> </bean> </beans>
发布广播:
if ("1".equals(status)){
jmsTemplate.send(topicPageAndSolrDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(String.valueOf(id));
return textMessage;
}
});
}
监听器1,将当前商品存入solr中:操作solr的服务器配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--产生Connection--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.200.128:61616"/> </bean> <!--spring 管理connectionFactory--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--发布订阅模式 将数据导入solr 索引库--> <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="youlexuan_topic_page_solr"/> </bean> <!--发布订阅模式 消息监听容器 将数据导入solr 索引库--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicPageAndSolrDestination" /> <property name="messageListener" ref="pageAndSolrListener" /> </bean>
#对应的用来监听执行往solr中保存库存的消息 <bean id="pageAndSolrListener" class="com.ghh.sellergoods.service.listener.ItemSearchListener"></bean> <!--点对点的模式 删除索引库--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!--指定从这个队列中 接收下架商品的--> <constructor-arg value="youlexuan_queue_solr_delete"/> </bean> <!--点对点的模式 消息监听器 删除索引库--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueSolrDeleteDestination" /> <property name="messageListener" ref="itemDeleteListener" /> </bean> <bean id="itemDeleteListener" class="com.ghh.sellergoods.service.listener.ItemDeleteListener"></bean> </beans>
监听器类
public class ItemSearchListener implements MessageListener { @Autowired private SearchService searchService; @Autowired private ItemDao itemDao; @Override public void onMessage(Message message) { //获取生产者发布的广播,往solr中添加库存列表 ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { //获取广播中的数据。 Long goodsId = Long.valueOf(atm.getText()); //通过传过来的商品id去查询库存表 ItemQuery query = new ItemQuery(); ItemQuery.Criteria criteria = query.createCriteria(); criteria.andGoodsIdEqualTo(goodsId); //查询对应商品id的库存表 List<Item> items = itemDao.selectByExample(query);
//调用对应的方法,往solr中添加当前商品对应库存信息 searchService.importList(items); } catch (JMSException e) { e.printStackTrace(); } } }
监听器类2:配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--产生Connection工厂类--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.200.128:61616"/> </bean> <!--spring管理工厂类--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--发布订阅模式 生成页面--> <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!--指定从这个队列上获取上架的商品id--> <constructor-arg value="youlexuan_topic_page_solr"/> </bean> <!--发布订阅模式 消息监听器 生成页面--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicPageAndSolrDestination" /> <property name="messageListener" ref="pageListener" /> </bean> <bean id="pageListener" class="com.ghh.core.service.listener.PageListener"></bean> </beans>
监听器类2:生成静态网页模板
public class PageListener implements MessageListener { @Autowired private CmsService cmsService; @Override public void onMessage(Message message) { ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { Long goodsId = Long.valueOf(atm.getText()); Map<String, Object> goodsData = cmsService.findGoodsData(goodsId); cmsService.createStaticPage(goodsId,goodsData); } catch (Exception e) { e.printStackTrace(); } } }
点对点
当我删除商品时,我需要对应的服务进行删除solr中库存信息,添加和删除使用的是同一个服务中,使用的是上面的配置文件
//发布广播, @Autowired private ActiveMQTopic topicPageAndSolrDestination; //在修改的代码方法中来广播发布当前商品的id if (ids.length>0) { jmsTemplate.send(queueSolrDeleteDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(String.valueOf(ids)); return textMessage; } }); }
#执行删除solr中库存信息
public class ItemDeleteListener implements MessageListener { @Autowired private SearchService searchService; @Override public void onMessage(Message message) { ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { Long goodsId = Long.valueOf(atm.getText()); searchService.deleteById(goodsId); } catch (JMSException e) { e.printStackTrace(); } } }
以上是关于Spring广播与监听的主要内容,如果未能解决你的问题,请参考以下文章