Active MQ学习笔记

Posted 26键的人生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Active MQ学习笔记相关的知识,希望对你有一定的参考价值。

一.Active MQ介绍

ActiveMQ Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

主要特点:

1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, php。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1J2EE 1.4规范 (持久化,XA消息,事务)

3. Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4. 通过了常见J2EE服务器(Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过JDBCjournal提供高速的消息持久化

7. 从设计上保证了高性能的集群,客户端-服务器,点对点

8. 支持Ajax

9. 支持与Axis的整合

10. 可以很容易得调用内嵌JMS provider,进行测试

 

 

 

 

二.Active MQ的安装启动

1、到官网http://activemq.apache.org/download.html下载最新版本,根据需要,可选择windows版本和linux版本。

2.本项目采用了windows版本,下载:apache-activemq-5.13.3.zip,解压,如下图:

进入bin目录,发现有win32和win64两个文件夹,这2个文件夹分别对应windows32位和windows64位操作系统的启动脚本。结合对应的电脑系统,

双击activemq.bat脚本进行MQ服务器的启动

 

得到如图:

 

3.ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。

 

三.Active MQSpring的整合配置

1.首先在项目中导入以下的jar包:

2.spring的整合配置文件:i-Factory-BEANS-JMS.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 

xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" 

xmlns:context="http://www.springframework.org/schema/context"

xmlns:jee="http://www.springframework.org/schema/jee"

xmlns:p="http://www.springframework.org/schema/p"

xsi:schemaLocation="

    http://www.springframework.org/schema/context

    http://www.springframework.org/schema/context/spring-context-3.0.xsd 

    http://www.springframework.org/schema/beans

    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

    http://www.springframework.org/schema/tx

    http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

    http://www.springframework.org/schema/aop

    http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

    http://www.springframework.org/schema/jee

    http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

 

<import resource="cn/myccit/conf/imes-baseedi-action_BEAN_Inherit.xml"/><!-- basereport模块 -->

    <!--MQ 配置-->

    <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  

        <property name="connectionFactory">  

            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  

                <property name="brokerURL" value="tcp://192.168.16.233:61616" />  

                 <!-- 是否异步发送 -->  

                <property name="useAsyncSend" value="true" />  

                <property name="closeTimeout" value="60000" />  

                <property name="userName" value="admin" />  

                <property name="password" value="admin" />  

                <property name="optimizeAcknowledge" value="true" /> 

                <property name="optimizedAckScheduledAckInterval" value="10000" />  

            </bean>  

        </property>  

    </bean>  

 

     <!-- Spring Caching连接工厂 -->

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

     <!--<property name="clientId" value="1" />  -->

        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  

        <property name="targetConnectionFactory" ref="jmsConnectionFactory"></property>

        <!-- 同上,同理 -->

        <!-- <constructor-arg ref="amqConnectionFactory" /> -->

        <!-- Session缓存数量 -->

        <property name="sessionCacheSize" value="100" />

    </bean>

     

    <!-- Spring JMS Template -->  

    <!--<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory">  

            <ref local="jmsConnectionFactory" />  

        </property>  

    </bean>  -->

 

  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="jmsConnectionFactory" />

    <property name="defaultDestination" ref="asyncQueue" />

    <property name="receiveTimeout" value="10000" />

  </bean>              

             <!--queue通道-->  

      <bean id="asyncQueue" class="org.apache.activemq.command.ActiveMQQueue">  

        <constructor-arg index="0">  

            <value>firstQueue</value>  

        </constructor-arg>  

      </bean>  

    

  <bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="jmsConnectionFactory" />

    <property name="defaultDestination" ref="asyncTopic" />

    <property name="receiveTimeout" value="10000" />

  </bean>              

            <!--topic通道-->  

   <bean id="asyncTopic" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0">  

            <value>asyncTopic</value>  

        </constructor-arg>  

    </bean>  

    

 

  <!-- 配置消息消费监听者 -->

  <bean id="machineMessageListener" class="cn.myccit.ifactory.edu.service.msg.MachineDataListener" >

   <property name="machineLogDAO" ref="machineLogDAO"></property>

  </bean>

  <!-- 消息监听容器,配置连接工厂,监听器是上面定义的监听器 -->

  <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="destination" ref="asyncQueue" />

    <!--主题(Topic)和队列消息的主要差异体现在JmsTemplate中"pubSubDomain"是否设置为True。如果为True,则是Topic;如果是false或者默认,则是queue-->

    <property name="pubSubDomain" value="false" />

    <property name="messageListener" ref="machineMessageListener" />

  </bean>

 

  

  

    <!-- 消息监听适配器 -->  

    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

        <property name="delegate">  

            <bean class="cn.myccit.imes.baseedi.service.impl.TopicMsgServiceImpl"/>  

        </property>  

        <property name="defaultListenerMethod" value="receiveMessage"/>  

    </bean>  

 

    <!-- 消息监听适配器对应的监听容器 -->  

    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

        <property name="connectionFactory" ref="connectionFactory"/>  

        <property name="destination" ref="asyncTopic"/>  

        <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->  

    </bean>  

    

</beans>

 

 

四.Active MQ的工作机制

Active MQ 的工作机制主要是通过JMSJava Message Service作为应用程序接口,用于在两个应用程序之间,或分布式系统中发送消息。JSM它是一个Java平台中关于面向消息中间件(MOM)的API,通过它提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。我们在应用jms的结构为:

1.JMS应用程序结构支持两种模型

队列模型

队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息。生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收。

发布者/订阅者模型

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息.在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布

  2.JMS应用程序要实现的接口:

ConnectionFactory 接口(连接工厂)

用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

Connection 接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

Destination 接口(目标)

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

MessageConsumer 接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

MessageProducer 接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

Message 接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:

消息头(必须):包含用于识别和为消息寻找路由的操作设置。

一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

Session 接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

 

本项目采用的通讯方式通过配置TCP/IP和端口进行实现:

<!--MQ 配置-->

    <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  

        <property name="connectionFactory">  

            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  

                <property name="brokerURL" value="tcp://192.168.16.233:61616" />  

                 <!-- 是否异步发送 -->  

                <property name="useAsyncSend" value="true" />  

                <property name="closeTimeout" value="60000" />  

                <property name="userName" value="admin" />  

                <property name="password" value="admin" />  

                <property name="optimizeAcknowledge" value="true" /> 

                <property name="optimizedAckScheduledAckInterval" value="10000" />  

            </bean>  

        </property>  

    </bean>  

四.Active MQ展示和常用操作

1.PLC端输出的模拟数据:

2.控制台通过接收的数据:

3.看板显示界面显示的数据:

 

以上是关于Active MQ学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习笔记(自用)

消息中间件学习笔记——RabbitMQ

Apache Pulsar MQ 学习笔记

消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]

消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]

RabbitMQ学习笔记-p1(初识MQ&快速入门)