消息系统 JMS(ActiveMQ实现) 精华一页纸
Posted 一页纸世界
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息系统 JMS(ActiveMQ实现) 精华一页纸相关的知识,希望对你有一定的参考价值。
消息系统,在集群环境中,对系统解耦,消息通讯有着居住轻重的作用;特别是在分布式环境下,严重依赖 消息系统来实现分布式事务的一致性。
消息系统本质上是一个中介模式,交互模块通过其进行消息收发,所以往往消息系统也成为 ESB 的一个消息流的选择。
消息系统的实现虽然千差万别,但一些基本模块和交互方式却是相似的。
1、消息系统与JMS
I、Broker:代理,一般是消息系统对外的 服务器
II、消息传递方式
P2P:传统的消费者模式,点对点
P-SUB:publis - subscribe 发布订阅模式,一对多,类似于网络通讯的 广播
III、JMS
层次模型:ConnectionFactory(连接工厂,和Hibernate的SessionFactory类似)---->Connection(连接)--->Session(会话)--->Message(消息)
消息流向:Producer(生产者) ----> Destination(目标:就是消息队列) ---> MessageConsumer(消费者)
发送端
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://xxx.xxx.xxx.xxx:61616");
Connection conn = cf.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = new ActiveMQQueue("queue");
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
message.setText(msg);
producer.send(message);
接受端
Destination destination = new ActiveMQQueue("queue");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
TextMessage textMessage = (TextMessage) message;
String msg = textMessage.getText();
IV、消息类型
StreamMessage - 流对象
TextMessage - String
MapMessage - <String,基本类型> 的Map
ObjectMessage - 序列化的对象
BytesMessage - 字节流数组消息
XMLMessage - XML消息
V、消息头
JMSDestination - 目的地(queue,topic)
JMSDeliveryMode - 模式 (是否持久化)
JMSTimestamp - send时,时间为当前时间
JMSExpiration - 有效期, ms 精度
JMSPriority - 优先级 0~4 为普通,5~9 为高级
JMSMessageID - 消息事务号
JMSReplyTo - 需要新增一个消息目的地,当消费者接受消息时,给 发送者会一个消息 ,消费者可以不回
JMSCorreationID - 关联消息的ID,可以不用
JMSType - 消息结构
JMSRedlivered - 标识消息重新发送过了
保证消息发送的途径,一个是 持久化,保证不会丢;一个是,消费者 收到消息回一个确认,这时生产者再在本地删除
2、JMS的可靠性机制
I、如何确认 消息是否已经被消费?
Session createSession(boolean transacted,int acknowledgeMode)
AUTO_ACKNOWLEDGE -- 自动通知: 客户端调用方法recieve或onMessage 返回时,自行确认
CLIENT_ACKNOWLEDGE -- 客户端决定何时通知: 客户端通过 发送 响应 acknowledge 消息,确认
DUPS_OK_ACKNOWLEDGE -- 延迟/批量通知: 在系统失败时,可以重复发送消息
(此种方式,性能较高,但可能会导致消费者重复接收消息)
II、消息持久化
消息头有一个属性JMSDeliveryMode
消息的发送模式:persistent或nonpersistent。前者表示消息在被消费之前,如果JMS提供者DOWN了,重新启动后消息仍然存在。后者在这种情况下表示消息会被丢失。可以通过下面的方式设置:
Producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
MessageProducer 消息生成者类,定义消息的一些属性
setDeliveryMode(int deliveryMode) -- 持久化
setPriority(int defaultPriority) -- 优先级
setTimeToLive(long timeToLive) -- 超时,默认不过期
III、临时目的地
除了正常的 目的地(队列,Topic外)
Queue createQueue(String queueName)
Topic createTopic(String topicName)
TemporaryQueue createTemporaryQueue()
JMS 还支持类似临时表的 临时目的地,该 目的地只在同一个 connection 连接存在(这点很类似临时表),所以Producer和Consumer 都基于同一个connection
IV、持久化订阅
-- 当用户上线时,注册一个识别自己身份的ID,当用户离线时,可以保存所有消息,当再次连到Provider 时,再把消息发给用户
非持久订阅,当用户不在线时,消息就丢了
3、ActiveMQ - JMS的开源实现
I、单个配置 activemq.xml
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?para=value"/>
</transportConnectors>
支持TCP/UDP/NIO/HTTP/JVM 等多种协议
II、集群配置
a、static 配置 or 动态发现
<transportConnectors>
<transportConnector name="static" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
</transportConnectors>
失败重连 failover:(tcp://localhost:61616)
b、Master/Slave 模式 主备模式
一共有三种方式,原理是,启动的Master节点持有锁(文件、数据库),其他Slave 节点只负责复制数据。只有一个节点对外提供服务;client 通过 failover 来连接这个集群。
当Master宕机,锁释放,其他Slave来竞争获取这个锁,获取锁的称为新的Master
c、Network of brokers
<networkConnector uri="static://(tcp://localhost:61617)"
name="bridge" dynamicOnly="false" conduitSubscriptions="true" duplex="true">
-- 标识每个Consumer都可以收所有消息
这样多个 Broker之间存储转发消息,其中一个 Broker 的消息会转发到其他Broker
后续版本开始支持 Zookeeper 进行集群管理
III、持久化
支持 本地磁盘 | JDBC 等多种方式
<broker brokerName="broker" persistent="true" useShutdownHook="false">
<persistenceAdapter>
<kahaPersistenceAdapter directory="activemq-data" maxDataFileLength="33554432"/>
</persistenceAdapter>
</broker>
IV、安全
授权插件 Simple Authentication Plugin
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager" groups="users,admins"/>
<authenticationUser username="user" password="password" groups="users"/>
<authenticationUser username="guest" password="password" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
JAAS 授权插件
V、Consumer 集群
Queue consumer clusters
-- 这里的集群其实 是应用的 消费者集群。 针对 Queue 队列的 消费者,因为是 一对一的,如果 多个Consumer 接收同一个Queue,那这个消息是随机被其中一个Consumer 消费的。这样应用就可以用 多个Consumer 去构建集群
ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。
http://www.cnblogs.com/anruy/p/4906616.html
4、集成Spring
I、在Spring中搭建消息代理
有两种方式
方式一,就是直接使用 正常的 Bean 配置方式
方式二,对于配置了 spring.schemas 命名空间的,可以直接使用命名空间,而不需要手工定义一个bean
创建连接工厂
<bean id="connectionFactory" class="">
<property name="brokerURL" value="tcp://localhost;61616"/>
</bean>
等同于下面的
<amq:connectionFactory id="connectionFatory" brokerURL="tcp://10.43.136.226:61616"/>
<bean id="queue" class="org.apache.activemq.command.ActiveMQTopic">
<contructor-arg value="my.queue"/>
</bean>
等同于下面的
<amq:queue id="queue" physicalName="my.queue"/>
<amq:queue id="topic" physicalName="my.topic"/>
II、使用Spring的JMS 模版
同 JdbcTemplate 一样,Spring也提供了 JMSTemplate 一方面把类似操作模版化,另一方面 捕获了checked异常,重新抛出 unchecked异常
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFatory" ref="connectionFatory"/>
<!--property name="defaultDestinationName" value="my.queue"/-->
</bean>
jmsTemplate.send("my.queue", new MessageCreator(){
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(message);
}
});
ObjectMessage recieveMessage = (ObjectMessage) jmsTemplate.receive("my.queue");
return (FMessage) recieveMessage.getObject();
III、创建消息驱动的POJO
创建消息监听器,扩展 MessageListener 接口;为了让 普通对象POJO 也能实现 MessageListener 处理(即不用显式的 扩展MessageListener 接口),Spring通过配置消息监听器实现调用
方法一、直接监听 MessageListener
这个就不能使用POJO了
方法二、SessionAwareMessageListener 扩展了 MessageListener,但在 onMessage 基础上提供了 Session可以用来发送消息,原先onMessage只能接受消息
方法三、MessageListenerAdapter 实现了前两种接口,同时 提供了自动发送返回消息的功能。这就是JMS 配置信息里面返回 确认的内容
配置 MessageListenerAdapter 或者 Spring.schemas
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="my.queue" ref="bean引用" method="POJO方法"/>
</jms:listener-container>
MessageListenerAdapter 类通过 反射,把onMessage 方法返回的 Message ,转换成 用户定义的对象(Text - String,Bytes - byte[],Object - Serializable 对象,Map-Map)
那如何才会 对发送方进行响应呢?
配置的处理方法,有返回值时,Spring 会包装返回值,然后对发送方发送一个响应消息
如何知道响应目的呢?
方法一,发送放在 发送时设置 JMSReplyto 时
方法二,设置 MessageListeneAdapter 的 defaultResponseDestination属性指定
如果两者都设置,使用方法一(方法二被覆盖)
5、基于JMS 消息的RPC
类似于 RmiServiceExporter | HessianExporter | BurlapExporter | HttpInvokerExporter,Spring提供了 JmsInvokerServiceExporter
发布时,只需要通过 JmsInvokerServiceExporter 就可以发布服务(其实就是 Jms 封装了 jms消息过程,由exporter接受消息,转换成调用
<bean id="jsmRpcServiceExporter" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"
p:service-ref="jmsRpcService"
p:serviceInterface="xxx"/>
调用服务,只需要通过 JmsInvokerProxyFactoryBean,这个Bean代理了 调用过程,把调用转换成 JMS 消息
<bean id="jmsRpcService" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queueName" value="jms.queue"/>
<property name="serviceInterface" value="xxxe"/>
</bean>
6、ActvieMQ 监控与优化
以上是关于消息系统 JMS(ActiveMQ实现) 精华一页纸的主要内容,如果未能解决你的问题,请参考以下文章