ActiveMQ+Spring项目实践

Posted Smile_Miracle

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ+Spring项目实践相关的知识,希望对你有一定的参考价值。

环境准备
项目基于JDK1.7+spring4+mybatis3+tomcat7构建的maven工程,用的是Apache-ActiveMQ-5.11.1的版本。

第一步:ActiveMQ安装与配置
首先你的去ActiveMQ官网去下载你要用的MQ版本根据不同的系统安装,我用的是windows 64版,直接下载ZIP解压,然后到bin目录下64文件夹运行bat文件即可启动,如果和自己的监听端口冲突可以自己修改和bin同级的conf目录下的ActiveMQ.xml文件。切记,5.1以后的MQ基本都是基于JDK1.7或者以上的版本,具体匹配度自己百度。

启动成功效果如图:

ActiveMQ常用模式
就目前来说我常用的就是两种:P2P与PUB/SUB,也就是单对单和一对多的模式,具体表现为QUEUE和TOPIC。
P2P使用Queue即队列目标,表现在生产者和消费者是一对一的关系,配置好后生产者发送的消息只能特定消费者消费。在此传送模型中,目标是一个队列。消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向此队列进行注册的某一个消费者,一次只传送一条消息。可以向队列目标发送消息的生产者的数量没有限制,但每条消息只能发送至、并由一个消费者成功使用。如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并在某个消费者向该队列进行注册时将消息传送给该消费者。

PUB/SUB 使用Topic即主题目标, 消息从一个生产者传送至任意数量的消费者。在此传送模型中,目标是一个主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的活动消费者。可以向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发送至任意数量的订阅消费者。主题目标也支持持久订阅的概念。持久订阅表示消费者已向主题目标进行注册,但在消息传送时此消费者可以处于非活动状态。当此消费者再次处于活动状态时,它将接收此信息。如果没有已经向主题目标注册的消费者,主题不保留其接收到的消息,除非有非活动消费者注册了持久订阅。

基础依赖POM

这里也不偷懒,直接把所有依赖全部贴进来,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.zhy</groupId>
  <artifactId>MQSender</artifactId>
  <version>0.0.1</version>
  <packaging>war</packaging>

  <properties>
        <SpdbNew2017.version>0.0.1</SpdbNew2017.version>
        <!-- spring版本号 -->
        <spring.version>4.0.2.RELEASE</spring.version>
        <!-- mybatis版本号 -->
        <mybatis.version>3.2.6</mybatis.version>
        <!-- log4j日志文件管理包版本 -->
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <comLog.version>1.1.1</comLog.version>
        <jackson.version>1.9.13</jackson.version>
        <webVersion>3.0</webVersion>
  </properties>
    <dependencies>
    <dependency>
            <groupId>com.spdb</groupId>
            <artifactId>spdbNew2017</artifactId>
            <version>$SpdbNew2017.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.shiro</groupId>
            <artifactId>shiro-spring</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>$jackson</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-jaxb-annotations</artifactId>
            <version>$jackson</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <!-- 表示开发的时候引入,发布的时候不会加载此包 -->
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier><!--指定jdk版本 -->
        </dependency>
        <!-- mybatis核心包 -->
        <!-- https://mvnrepository.com/artifact/com.mchange/c3p0 -->
        <dependency>
            <groupId>com.mchange</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>$mybatis.version</version>
        </dependency>
        <!-- mybatis/spring包 -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
            <version>1.2.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- 导入java ee jar 包 -->
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
        </dependency>
        <!-- 导入mysql数据库链接jar包 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.30</version>
        </dependency>
        <!-- 导入dbcp的jar包,用来在applicationContext.xml中配置数据库 -->
        <dependency>
            <groupId>commons-dbcp</groupId>
            <artifactId>commons-dbcp</artifactId>
            <version>1.2.2</version>
        </dependency>
        <!-- JSTL标签类 -->
        <dependency>
            <groupId>jstl</groupId>
            <artifactId>jstl</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- 日志文件管理包 -->
        <!-- log start -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>$log4j.version</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>$comLog.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-asl</artifactId>
            <version>$jackson.version</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-lgpl</artifactId>
            <version>$jackson.version</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-jaxrs</artifactId>
            <version>$jackson.version</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>$jackson.version</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-lgpl</artifactId>
            <version>$jackson.version</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-smile</artifactId>
            <version>$jackson.version</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-xc</artifactId>
            <version>$jackson.version</version>
        </dependency>

        <!-- spring 核心jar包 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>$spring.version</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>$spring.version</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>$spring.version</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>$spring.version</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>$spring.version</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
            <version>$spring.version</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>$spring.version</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>$spring.version</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>$spring.version</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc-portlet</artifactId>
            <version>$spring.version</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>$spring.version</version>
        </dependency>

        <!-- mybatis分页插件 -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper</artifactId>
            <version>4.0.0</version>
        </dependency>

        <!-- SQLSERVER连接包 -->
        <dependency>
            <groupId>net.sourceforge.jtds</groupId>
            <artifactId>jtds</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.6.8</version>
        </dependency>

        <dependency>
            <groupId>org.freemarker</groupId>
            <artifactId>freemarker</artifactId>
            <version>2.3.16</version>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.7</version>
        </dependency>

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

        <!-- 格式化对象,方便输出日志 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.41</version>
        </dependency>
        <!-- log end -->

        <!-- 上传组件包 -->
        <dependency>
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>

        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.9</version>
        </dependency>

        <!-- httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>

        <!-- 消息队列核心包 -->
        <dependency>  
            <groupId>org.apache.activemq</groupId>  
            <artifactId>activemq-all</artifactId>  
            <version>5.14.1</version>  
        </dependency> 

        <!-- 消息队列附属包 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>$spring.version</version>
        </dependency>
        </dependencies>

        <build>
               <plugins>
                   <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.0</version>
                        <configuration>
                            <source>1.7</source>
                            <target>1.7</target>
                            <encoding>UTF-8</encoding>
                        </configuration>
                   </plugin>
               </plugins>
        </build>

</project>

这些依赖是由以前项目中拷贝过来的,很多都可以去除,这里不浪费时间。

项目构建

这里先说一下项目的构建有哪几块。完整的项目的话,它应该有三大块:
第一块:基础的父工程被消息生产者和消息消费者通用,这么做的目的是为了生产者给消费者传送对象数据的时候,可以在基础父工程中将这个对象定义好并实现序列化,然后消费者直接用同一个对象接收这个对象即可使用。

第二块:消息消费者(consumer),它的作用是接收生产者发送过来的消息,解析消息里面的内容做相应操作,例如异步存储,ActiveMQ很重要的一个作用就是将复杂的流程异步化,不用同步,很大的减轻了服务器压力,多用于解决高并发问题;另外一点就是:原本我们实现模块或者系统间的服务调用一般都是暴露服务然后远程请求获取数据,这样会造成模块或者系统间的依耐性太强,有了ActiveMQ后我们可以用它来解决模块或者系统间的依赖问题,很大程度解决了系统或者模块间的耦合问题,也方便代码维护。

第三块:消息生产者(producer),它的作用很显然了,是为了给消费者发信息通知消费者该干什么事了,发消息时注意它是队列还是主题模式。

但是由于时间和硬件限制,我这里就做了一个简单的自测型项目进行模拟。

核心配置文件如下

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd "
>

    <!-- JMS连接工厂 -->
    <bean id="myProviderFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
        <property name="useAsyncSend" value="true"/>
        <!-- 客户端唯一标识 -->
        <property name="clientID" value="spdb"/>
    </bean>

    <!-- 定义主题 -->
    <bean id="sitLogTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题名字 -->
        <constructor-arg value="spdb.log"/>
    </bean>

    <bean id="smsTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="spdb.sms"/>
    </bean>

    <bean id="sitUserTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="spdb.user"/>
    </bean>

    <!-- 定义队列 -->
    <bean id="textQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spdb.text"/>
    </bean>


    <!-- 发送者客户端主题模式也就是PUB/SUB -->
    <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="myProviderFactory"/>
        <!-- <property name="defaultDestination" ref="sitLogTopic"/> -->
        <!-- 开启订阅模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="receiveTimeout" value="10000"/>
        <!-- deliveryMode, priority, timeToLive的开关要生效,必须配置为true,默认false-->
        <property name="explicitQosEnabled" value="true"/>
        <!-- 发送模式
            DeliveryMode.NON_PERSISTENT=1 非持久
            DeliveryMode.PERSISTENT=2 持久
        -->
        <property name="deliveryMode" value="2"/>
    </bean>

    <!-- 发送者客户端队列模式也就是P2P -->
    <bean id="myJmsTemplate2" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="myProviderFactory"/>
        <!-- <property name="defaultDestination" ref="sitLogTopic"/> -->
        <!-- 开启订阅模式 -->
        <property name="pubSubDomain" value="false"/>
        <property name="receiveTimeout" value="10000"/>
        <!-- deliveryMode, priority, timeToLive的开关要生效,必须配置为true,默认false-->
        <property name="explicitQosEnabled" value="true"/>
        <!-- 发送模式
            DeliveryMode.NON_PERSISTENT=1 非持久
            DeliveryMode.PERSISTENT=2 持久
        -->
        <property name="deliveryMode" value="2"/>
    </bean>

    <!-- 消费者客户端 -->
    <!-- JMS连接工厂 -->
    <bean id="myConsumerFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
        <property name="useAsyncSend" value="true"/>
        <!-- 客户端唯一标识 -->
        <property name="clientID" value="spdb.msg.center"/>
        <!-- ObjectMessage信任所有对象 -->
        <property name="trustAllPackages" value="true"/>
    </bean>

    <!-- 消息消费监听者 -->
    <bean id="logListener" class="com.zhy.test.listener.LogListener"/>
    <!-- 订阅客户端 -->
    <bean id="logListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="myConsumerFactory"/>
        <!-- 开启订阅模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="sitLogTopic"/>
        <!-- 持久订阅 -->
        <property name="subscriptionDurable" value="true"/>
        <!---接收客户端ID,在持久化时,客户端不在线时消息就存在数据库里,直到被这个ID的客户端消费掉-->
        <property name="clientId" value="spdb.log.consumer"/>
        <property name="messageListener" ref="logListener"/>
        <!-- 消息应答方式
            Session.AUTO_ACKNOWLEDGE=1  消息自动签收
            Session.CLIENT_ACKNOWLEDGE=2  客户端调用acknowledge方法手动签收
            Session.DUPS_OK_ACKNOWLEDGE=3  不必必须签收,消息可能会重复发送
        -->
        <property name="sessionAcknowledgeMode" value="1"/>
    </bean>

    <!-- 消息消费监听者 -->
    <bean id="logListener2" class="com.zhy.test.listener.LogListener2"/>
    <!-- 订阅客户端 -->
    <bean id="logListener2Container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="myConsumerFactory"/>
        <!-- 开启订阅模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="sitLogTopic"/>
        

以上是关于ActiveMQ+Spring项目实践的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot JMS(ActiveMQ) 使用实践

Spring整合ActiveMQ测试

ActiveMQ(07):ActiveMQ结合Spring开发--建议

基于 CODING 的 Spring Boot 持续集成项目

ActiveMQ 重发机制与确认机制 实践

Spring与ActiveMQ整合