ActiveMQ消息队列技术融合Spring

Posted 愤青程序猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ消息队列技术融合Spring相关的知识,希望对你有一定的参考价值。

 

一、业务逻辑

  我想在修改一个物品的状态时,同时发送广播,给对应的监听器去实现,此商品存储到solr中,同时通过网页静态模板生成一个当前物品的详情页面,此时用到了广播机制

  当我删除一个商品时,发送一个广播,给对应的监听器,同时删除solr中对应的物品。

 

广播机制:必须要同时在线,才能接收我的消息

   使用消息中间件需要导入配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://192.168.200.128:61616"/>  
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
    <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <property name="connectionFactory" ref="connectionFactory"/>  
    </bean>   
    
    <!-- 发布订阅模式, 商品导入索引库和生成静态页面 -->
    <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
          <!--将商品上架所有的商品的id发送到这个队列中-->
         <constructor-arg value="youlexuan_topic_page_solr"/>
    </bean>
    <!-- 点对点模式-->
    <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!--将商品上架所有的商品的id发送到这个队列中-->
        <constructor-arg value="youlexuan_queue_solr_delete"/>
    </bean>  
    
</beans>

发布广播:

if ("1".equals(status)){
jmsTemplate.send(topicPageAndSolrDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(String.valueOf(id));
return textMessage;
}
});
}

监听器1,将当前商品存入solr中:操作solr的服务器配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">
      <!--产生Connection-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://192.168.200.128:61616"/>  
    </bean>
    <!--spring 管理connectionFactory-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">    
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>
    <!--发布订阅模式   将数据导入solr 索引库-->
    <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg value="youlexuan_topic_page_solr"/>
    </bean>
    <!--发布订阅模式   消息监听容器 将数据导入solr 索引库-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicPageAndSolrDestination" />
        <property name="messageListener" ref="pageAndSolrListener" />
    </bean>
#对应的用来监听执行往solr中保存库存的消息 <bean id="pageAndSolrListener" class="com.ghh.sellergoods.service.listener.ItemSearchListener"></bean> <!--点对点的模式 删除索引库--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!--指定从这个队列中 接收下架商品的--> <constructor-arg value="youlexuan_queue_solr_delete"/> </bean> <!--点对点的模式 消息监听器 删除索引库--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueSolrDeleteDestination" /> <property name="messageListener" ref="itemDeleteListener" /> </bean> <bean id="itemDeleteListener" class="com.ghh.sellergoods.service.listener.ItemDeleteListener"></bean> </beans>

监听器类

public class ItemSearchListener implements MessageListener {
    @Autowired
    private SearchService searchService;
    @Autowired
    private ItemDao itemDao;
    @Override
    public void onMessage(Message message) {
        //获取生产者发布的广播,往solr中添加库存列表
        ActiveMQTextMessage atm = (ActiveMQTextMessage) message;
        try {
            //获取广播中的数据。
            Long goodsId = Long.valueOf(atm.getText());
            //通过传过来的商品id去查询库存表
            ItemQuery query = new ItemQuery();
            ItemQuery.Criteria criteria = query.createCriteria();
            criteria.andGoodsIdEqualTo(goodsId);
            //查询对应商品id的库存表
            List<Item> items = itemDao.selectByExample(query);
        //调用对应的方法,往solr中添加当前商品对应库存信息 searchService.importList(items); }
catch (JMSException e) { e.printStackTrace(); } } }

监听器类2:配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd"> 
    <!--产生Connection工厂类-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.200.128:61616"/>  
    </bean>
    <!--spring管理工厂类-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">    
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>
    <!--发布订阅模式  生成页面-->
    <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">  
          <!--指定从这个队列上获取上架的商品id-->
          <constructor-arg value="youlexuan_topic_page_solr"/>
    </bean>
    <!--发布订阅模式 消息监听器 生成页面-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicPageAndSolrDestination" />
        <property name="messageListener" ref="pageListener" />
    </bean>
    <bean id="pageListener" class="com.ghh.core.service.listener.PageListener"></bean>
    
</beans>

监听器类2:生成静态网页模板

public class PageListener implements MessageListener {
    @Autowired
    private CmsService cmsService;
    @Override
    public void onMessage(Message message) {
        ActiveMQTextMessage atm = (ActiveMQTextMessage) message;
        try {
            Long goodsId = Long.valueOf(atm.getText());
            Map<String, Object> goodsData = cmsService.findGoodsData(goodsId);
            cmsService.createStaticPage(goodsId,goodsData);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

点对点

  当我删除商品时,我需要对应的服务进行删除solr中库存信息,添加和删除使用的是同一个服务中,使用的是上面的配置文件

    //发布广播,
    @Autowired
    private ActiveMQTopic topicPageAndSolrDestination;
//在修改的代码方法中来广播发布当前商品的id

if (ids.length>0) {
            jmsTemplate.send(queueSolrDeleteDestination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(String.valueOf(ids));
                    return textMessage;
                }
            });
        }
#执行删除solr中库存信息

public
class ItemDeleteListener implements MessageListener { @Autowired private SearchService searchService; @Override public void onMessage(Message message) { ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { Long goodsId = Long.valueOf(atm.getText()); searchService.deleteById(goodsId); } catch (JMSException e) { e.printStackTrace(); } } }

 

 

 

 

 

 

 

 

 

 

 

 

以上是关于ActiveMQ消息队列技术融合Spring的主要内容,如果未能解决你的问题,请参考以下文章

JMS与activemq--4.MQ类型对比/MQ技术选型(资料来源网络)

Java消息队列-Spring整合ActiveMq

Spring整合activeMQ消息队列

Spring Boot的JMS发送和接收队列消息,基于ActiveMQ

activeMq消息队列的Spring整合配置

Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)