[Java技术堂-五] JMS-ActiveMQ交流学习

Posted 糖醋刀豆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Java技术堂-五] JMS-ActiveMQ交流学习相关的知识,希望对你有一定的参考价值。



0 1

消息队列应用场景


1. 解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。


2. 冗余

有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。


3. 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。


4. 灵活性 & 峰值处理能力

当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。请查看我们关于峰值处理能力的博客文章了解更多此方面的信息。


5. 可恢复性

当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。


6. 送达保证

消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。


7.排序保证

在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。


8.缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。


9. 理解数据流

在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。


10. 异步通信

很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。



0 2

【消息队列MQ】各类MQ比较


RabbitMQ

是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。


Redis

是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。


ZeroMQ

号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中使用ZeroMQ作为数据流的传输。


ActiveMQ

是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 php、 Ruby等。


Jafka/Kafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。


0 3

JMS学习交流


JMS介绍

Java Message Service(JMS)是SUN提出的旨在统一各种MOM(Message-Oriented Middleware)系统接口的规范,它包含点对点(Point to Point,PTP)和发布/订阅(Publish/Subscribe,pub/sub)两种消息模型,提供可靠消息传输、事务和消息过滤等机制。

简单的说,JMS制定了一个发消息的规范。是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。ActiveMQ是Apache出品的开源项目,它是JMS规范的一个实现。


JMS的作用

在不同应用之间进行通信或者从一个系统传输数据到

另外一个系统。两个应用程序之间,或分布式系统中发送消息,进行异步通信。

这类问题有很多解决方案 ,比如DB、SOA、Socket通信、RMI,等,但我们需要根据项目的限制以及功能和性能的需要作出选择。

JMS的应用场景:规模和复杂度较高的分布式系统。

  1. 同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行;

  2. 客户和服务对象的生命周期紧密耦合:客户进程和服务对象进程都必须正常运行;如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常;

  3. 点对点通信:客户的一次调用只发送给某个单独的目标对象。



[Java技术堂-五] JMS-ActiveMQ交流学习

MOM在系统中的位置


JMS模型

Java消息服务应用程序结构支持两种模型:


1.点对点模型(基于队列)

每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的相关性.可以由多个发送者,但只能被一个消费者消费。

  • 一个消息只能被一个接受者接受一次  

  • 生产者把消息发送到队列中(Queue),这个队列可以理解为电视机频道(channel)  

  • 在这个消息中间件上有多个这样的channel  

  • 接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态


2.  发布者/订阅者模型(基于主题的)

每个消息可以有多个消费者。

生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。

  • 允许多个接受者,类似于广播的方式  

  • 生产者将消息发送到主题上(Topic)  

  • 接受者必须先订阅

注:持久化订阅者:特殊的消费者,告诉主题,我一直订阅着,即使网络断开,消息服务器也记住所有持久化订阅者,如果有新消息,也会知道必定有人回来消费。



[Java技术堂-五] JMS-ActiveMQ交流学习

JMS消息发送模式



[Java技术堂-五] JMS-ActiveMQ交流学习

Topic发送模式




[Java技术堂-五] JMS-ActiveMQ交流学习

JMS的公共接口



JMS的基本构件

  • 连接工厂:连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。

  • 连接: JMS Connection封装了JMS客户端到JMS Provider 的连接与JMS提供者之间的一个虚拟的连接。

  • 会话: JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息的生产者(producer),消费者(consumer),消息(message)等,会话,是一个事务性的上下文。

    消息的生产和消费不能包含在同一个事务中。

  • 生产者:MessageProducer   由Session 对象创建的用来发送消息的对象

  • 消费者:MessageConsumer  由Session对象创建的用来发送消息的对象

  • 消息:Message jms消息包括消息头和消息体以及其它的扩展属性。

    JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

  • 目的地:Destination,消息的目的地,是用来指定生产的消息的目标和它消费的消息的来源的对象。

  • 消息队列:Queue 点对点的消息队列

  • 消息主题:Tipic发布订阅的消息队列


[Java技术堂-五] JMS-ActiveMQ交流学习

Jms消息发送时序图



Jms消息发送开发流程

1、生产者(producer)开发流程(ProducerTool.java):

  • 1.1 创建Connection:

    根据url,user和password创建一个jms Connection。

  • 1.2 创建Session:

    在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。

  • 1.3 创建Destination对象:

    需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息。

  • 1.4 创建MessageProducer:

    根据Destination创建MessageProducer对象,同时设置其持久模式。

  • 1.5 发送消息到队列(Queue):

    封装TextMessage消息,使用MessageProducer的send方法将消息发送出去。


2、消费者(consumer)开发流程(ConsumerTool.java):

  • 2.1 实现MessageListener接口:

    消费者类必须实现MessageListener接口,然后在onMessage()方法中监听消息的到达并处理。

  • 2.2 创建Connection:

    根据url,user和password创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId。

  • 2.3 创建Session和Destination:

  • 2.4创建replyProducer【可选】:

    可以用来将消息处理结果发送给producer。

  • 2.5 创建MessageConsumer:

    根据Destination创建MessageConsumer对象。

  • 2.6 消费message:

    在onMessage()方法中接收producer发送过来的消息进行处理,并可以通过replyProducer反馈信息给producer。



[Java技术堂-五] JMS-ActiveMQ交流学习

Jms消息订阅者流程图




JMS消息的事务

1.创建事务createSession(paramA,paramB);

  • paramA是设置事务的,paramB设置acknowledgment mode(应答模式)。

  • paramA设置为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。


2.事务的应答确认

  • A)paramA设置为true时:

    paramB的值忽略, acknowledgment mode被jms服务器设置 SESSION_TRANSACTED。

    当一个事务被提交的时候,消息确认就会自动发生。

  • B)paramA设置为false时:

    Session.AUTO_ACKNOWLEDGE为自动确认,当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

    Session.CLIENT_ACKNOWLEDGE 为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的

    acknowledge方法。jms服务器才会删除消息。(默认是批量确认)

    DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收,而且允许重复确认。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。




消费者的消费方式

下两种方法之一:

  • 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

  • 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

    实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。



JMS的通信机制

[Java技术堂-五] JMS-ActiveMQ交流学习

JMS 的通信机制


activeMQ支持多种通讯协议TCP/UDP等,我们选取最常用的TCP来分析activeMQ的通讯机制。首先我们来明确一个概念:

  • 客户(Client):消息的生产者、消费者对activeMQ来说都叫作客户。

  • 消息中转器(Message broker):它是activeMQ的核心,它接收信息并进行相关处理后分发给消息消费者。


为了能清楚的描述出activeMQ的核心通讯机制,我们选择3个部分来进行说明,它们分别是建立链接、关闭链接、心跳。


一、Client跟activeMQ的TCP通讯的初始化过程分析如下:

  1. activeMQ初始化时,通过TcpTransportServer类根据配置打开TCP侦听端口,客户通过该端口发起建立链接的动作。

  2. 把accept的Socket放入阻塞队列中。

  3. 另外一个线程Socket handler阻塞着等待队列中是否有新的Socket,如果有则取出来。

  4. 生成一个TransportConnection的实例。TransportConnection类的主要作用是处理链路的状态信息,并实现CommandVisitor接口来完成各类消息的处理。

  5. TransportConnection会使用一个由多个TransportFilter实例组成的消息处理链条,负责对接收到的各类消息进行处理并发送相应的应答。这个链条的典型组成顺序:MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在这条链条中最后的一环就是TcpTransport类,它是实际和Client获取和发送数据的地方。

  6. 该类的重要建链完成,可以进行通讯操作。方法有run()和oneway(),一个负责读取,一个负责发送。  


二、关闭链接
activeMQ发现TCP链接的关闭,最关键的代码在TcpBufferedInputStream类中的 int n = in.read(buffer, position, buffer.length - position);

三、心跳
为了更好的维护TCP链路的使用,activeMQ采用了心跳机制作为判断双方链路的健康情况。activeMQ使用的是双向心跳,也就是activeMQ的Broker和Client双方都进行相互心跳,但不管是Broker或Client心跳的具体处理情况是完全一样的,都在InactivityMonitor类中实现,下面具体介绍。

心跳会产生两个线程“InactivityMonitor ReadCheck”和“InactivityMonitor WriteCheck”,它们都是Timer类型,都会隔一段固定时间被调用一次。ReadCheck线程主要调用的方法是readCheck(),当在等待时间内,有消息接收到,则该方法会返回true。WriteCheck线程主要调用的方法是writeCheck(),这有个小技巧,大家可以参考一下,那就是当WriteCheck线程休眠时,有任何数据发送成功,则该线程被唤醒后,不用通过TCP向对方真的发送心跳消息,这样可以从一定程度上减少网络传输的数据量。


ActiveMQ模型分析

首先介绍该模型中每个领域类的作用,然后再介绍它们之间的关系。

  • Broker:activeMQ的一个整体代表

  • RegionBroker:负责分发broker的操作到相应的消息区域

  • Region:activeMQ目前有四种主要消息区域:队列域(queueRegion)、主题域(topicRegion)、临时队列域(tempQueueRegion)、临时主题域(tempTopicRegion)

  • TransportConnection:代表一个通讯连接Destination:消息的目的地,主要包括两种Queue、Topic两种

  • Subscription:消息的消费者、订阅者

  • MessageStore:消息持久化存储,象比较复杂的Kaha存储机制就放在这

  • PendingMessageCursor:等待发给消费者的消息分发指针

  • ConnectionContext:用来维护发送请求所需的连接上下文


ActiveMQ模型分析

[Java技术堂-五] JMS-ActiveMQ交流学习

ActiveMQ模型分析---静态模型


下面我们把这些领域类的关系进行一个描述:

  1. 一个RegionBroker拥有4种消息域的对象。

  2. RegionBroker拥有所有目的地对象(destination)。

  3. 每个消息域(Region)也拥有它们对应的0或N个目的地对象(destination)。

  4. 同时每个Region也拥有它们对应的0或N个消息消费者、订阅者(subscription)。

  5. 每个目的地都有一个相应的持久化存储方式(messageStore),以及一个等待发送的消息分发指针(pendingMessageCursor)。

  6. 消息消费者和目的地可以彼此拥有0或N个。

  7. 每个消费者都有一个对应的ConnectionContext,ConnectionContext里包括一个TransportConnection对象,通过TransportConnection把真实的消息发给消费者。

  8. TransportConnection也可以做为通讯连接,侦听消息生产者发出的信息,所以每个TransportConnection会指向Broker对象。


ActiveMQ模型分析


[Java技术堂-五] JMS-ActiveMQ交流学习

ActiveMQ模型分析-----动态模型


消费生产者进程向activeMQ所在进程发送消息和消费者消费消息的过程如上图所示,消息传递的路径经过了核心领域模型,具体步骤如下:


  • 步骤1:生产者通过向activeMQ为它建立好的TransportConnection发送消息给

  • activeMQ。

  • 步骤2:TransportConnection对象找到RegionBroker。

  • 步骤3:RegionBroker根据消息的类型找到对应的消息区域(Region)。

  • 步骤4:该Region在它自己里面找到相应的消息目的地。

  • 步骤5、6:该目的地首先根据需要进行持久化操作,并使用待发送消息指针对象。

  • 步骤7:当有合适的消息消费者、订阅者来到时,目的地会找到这些消费者。

  • 步骤8、9:通过该消费者对应的TransportConnection,发给相应的消费者进程。



activeMQ消息分发指针

消息分发游标是用来保存JMS消息的引用。消息游标的处理过程如下:

  1. 当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中。

  2. 如果发现当前有活跃的consumer,而且这个consumer消费消息的速度能跟上producer生产消息的速度,那么ActiveMQ会直接把消息传递给broker内部跟这个consumer关联的queue;

  3. 如果当前没有活跃的consumer或者consumer消费消息的速度跟不上producer生产消息的速度,那么ActiveMQ会使用Pending Message Cursors保存对消息的引用。

  4. Pending Message Cursors把消息引用传递给broker内部跟这个consumer关联的dispatch queue。

    以下是两种Pending Message Cursors:

    VM Cursor。在内存中保存消息的引用。

    File Cursor。首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中。


我们可以在activemq.xml 中配置消息分发指针的存储策略。



ActiveMQ的监控

  1. activeMQ自动的管理站点

    http://localhost:8161/admin

  2. Advisory Messages

    ActiveMQ支持Advisory Messages,它允许我们通过标准的JMS消息来监控系统.通过它我们可以得到关于JMS provider、producers、consumers和destinations的信息。

  3. QueueBrowser

    使用QueueBrowser的消息预览,编程提供监控接口。


actviemq 配置连接URI

  1. 配置JMS连接最大闲置时间(消息服务器无消息)

    jmsBrokerURL = tcp://218.241.100.165:61616?wireFormat.maxInactivityDuration=90000该wireFormat.maxInactivityDuration = 90000的默认值是30000ms

    wireFormat.maxInactivityDuration=0 这样的参数, wireFormat.maxInactivityDuration是心跳参数。避免ActiveMQ在一段时间没有消息发送时抛出 "Channel was inactive for toolong"异常。


  2. maxReconnectDelay 最大重连间隔

    failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=10000);maxReconnectDelay=10000

    failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100

    failover  失效备援

    maxReconnectDelay=10000 最大重连间隔


  3. 设置异步发送消息

    tcp://localhost:61616?jms.useAsyncSend=true

    tcp://localhost:61616?jms.prefetchPolicy.all=100&jms.redeliveryPolicy.maximumRedeliveries=5


  4. 客户端消息缓存的数量

    tcp://localhost:61616?jms.prefetchPolicy.all=50    ##设置客户端最多缓存50条消息


  5. 客户端的预支取策略。

    tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 


ActiveMQ稳定性和容错性考虑

1.保障Jms连接

使用失效备援机制,和间隔自动重试机制,程序控制等方面来控制。

failover:(tcp://localhost:61616)?initialReconnectDelay=100&;maxReconnectAttempts=5

  • failover transport是一种重新连接机制,用于建立可靠的传输。此处配置的是一旦ActiveMQ broker中断,Listener端将每隔100ms自动尝试连接,直至成功连接或重试5次连接失败为止。

  • failover还支持多个borker同时提供服务,实现负载均衡的同时可增加系统容错性,格式:
    failover:(uri1,...,uriN)?transportOptions

  • failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false

    failover:(uri1,...,uriN)?transportOptions

    failover:uri1,...,uriN

    failover:(tcp://localhost:61616)


2. JMSRedelivered
如果这个值为true,表示消息是被重新发送了。因为有时消费者没有确认他已经收到消息或者JMS提供者不确定消费者是否已经收到。


3.JMSExpiration

允许消息过期, setTimeToLive()设置消息的有效期。



activeMQ的failOver重连机制

"failover:(tcp://IPAddress1:61616,tcp://IPAddress1:61616)?initialReconnectDelay=100&maxReconnectAttempts=5";


1.后面的参数initialReconnectDelay=100&maxReconnectAttempts=5“对每一个连接URI是通用的。


2.如果没有指定URI的获取方式,activeMQ会自动选择其中的一个URI来尝试建立连接(randomize 指定随机),获取连接后,ActiveMQ会维护连接的暂停和恢复。以上面的URL为例,说明failOver的重连机制:

  • IPAddress1, IPAddress2上的broker1,broker2都正常运行,创建的Connection会使用IPAddress1的broker1来发送消息,这时不激活消费者。

  • 关闭broker1,Connection会自动切换到broker2的URI上来发送消息。

  • 激活消费者,消费者会先尝试broker1,由于broker1不可用,使用broker2来收消息,这时只能收到broker2上的消息。

  • 再重新启动broker1,生产者,和消费者都仍然使用broker2来发送和接受消息。

  • 关闭broker2,生产者和消费者都会自动切换到broker1上,消费者就收到之前broker发送的消息了。


[Java技术堂-五] JMS-ActiveMQ交流学习

failOver重连机制



activeMQ安全管理

1.编程式实现

通过ActiveMQ提供的实现添加消息用户的权限(由SimpleAuthenticationPlugin类实现)。


2.配置实现

  • 配置mq访问者信息,activemq安装目录下/conf/credentials.properties

  • 权限管理 , 在${ACTIVEMQ_HOME}/conf/activemq.xml 中配置

<plugins>

  <simpleAuthenticationPlugin>

  </simpleAuthenticationPlugin>

  <authorizationPlugin>

  </authorizationPlugin>

</plugins>



调整TCP传输设置

TCP传输是activeMQ最常用的传输方式。其中socketBufferSize和tcpNoDelay对传输性能有较大的影响。

  • socketBufferSize 通过tcp传输发送和接受数据的缓冲区大小,默认( 65536 bytes)

  • tcpNoDelay - 默认为false。通常一个TCP socket缓冲区创建小的数据在发送之前。启用此选项 - 消息将被尽快发送。

    url = "failover://(tcp://localhost:61616?tcpNoDelay=true)";

[Java技术堂-五] JMS-ActiveMQ交流学习

OpenWire参数调试


wireFormat包信息

程序中截获的传输格式(wire format)对象:

WireFormatInfo {

    version=7,

    properties={

        CacheSize=1024,

        CacheEnabled=true,

        SizePrefixDisabled=false,   MaxInactivityDurationInitalDelay=10000,

        TcpNoDelayEnabled=true,

        MaxInactivityDuration=30000,

        TightEncodingEnabled=true,

        StackTraceEnabled=true},

    magic=[A,c,t,i,v,e,M,Q]

 }



ActiveMQ集群部署

1.多个消息提供者

使用Network of brokers,以便在broker之间存储转发消息。


2.多个消息消费者

ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。


ActiveMQ集群部署



Master/salve Server

1.主辅服务器的作用

  • 主辅服务器:提供消息服务。

  • 辅服务器:提供消息的备份,服务的备份。


2.Pure Master Slave的工作方式

  • A)服务端:

    Slave broker消费master broker上所有的消息状态,例如消息、确认和事务状态等。

    Slave broker不提供消息服务。

    Master broker只有在消息成功被复制到slave broker之后才会响应客户。

    master broker失效的时候,slave broker可以启动network connectors和transport connectors,提供消息服务,也可以跟着停止。

  • B)客户端:

    使用failover的机制uri= “failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false ”;


3.配置

  • Master broker不需要特殊的配置。

  • Slave broker需要进行以下配置:

    <broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">


4.限制

  • 只能有一个slave broker连接到master broker。  

  • master broker失效而导致slave broker成为master之后,之前的master broker只有在当前的master broker(原slave broker)停止后才能重新生效。 


spring和activeMQ的结合

使用spring对jms的支持,配置jms的各个组件

  1. 配置jms连接工厂

    <amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />


  2. 配置消息队列

    <amq:queuename="destination" physicalName=“queuename"/>


  3. 配置消息监听器

    <bean id="messageListener"

       class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

       <constructor-arg>

           <bean class=“类路径"></bean>

       </constructor-arg>

       <!-- 配置监听到jms方法后调用的执行方法-->

       <property name="defaultListenerMethod" value="printMyOut"/>

       <!-- custom MessageConverter define -->

       <property name="messageConverter" ref="invokeMessageConverter" /

    </bean>


  4. 配置消息监听容器

    <bean id="listenerContainer"

           class="org.springframework.jms.listener.DefaultMessageListenerContainer">

           <property name="connectionFactory" ref="jmsConnectionFactory" />

           <property name="destination" ref="destination" />

           <property name="messageListener" ref="messageListener" />

    </bean>


  5. 配置jms消息转换器

    <bean id="invokeMessageConverter" class="com.hc360.components.jms.InvokeMessageConverter" />



  6. 配置Spring的jms处理模版类(jmsTemplte)

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

       <property name="connectionFactory">

           <ref local="jmsFactory" />

       </property>

       <property name="defaultDestinationName" value="subject"/>

       <!--

       区别它采用的模式为false是p2p为true是订阅

       <property name="pubSubDomain" value="true"/>

       -->

       <!-- custom MessageConverter -->

       <property name="messageConverter" ref="invokeMessageConverter" />

    </bean>

  7. 消息生产者和消息消费者

    该部分可以根据业务需要,用户自己编程实现。



ActiveMQ的消息传输机制

  1. The TCP Transport

    语法:tcp://hostname:port?key=value

  2. The Failover Transport

    语法: failover:(uri1,...,uriN)?transportOptions

  3. The Multicast Transport

    语法: multicast://address:port?transportOptions

  4. The UDP Transport

    语法:udp://hostname:port?transportOptions

  5. The VM Transport

    语法:vm://brokerName?transportOptions

  6. The NIO Transport

    修改activemq.xml配置 <transportConnector name="nio"uri="nio://0.0.0.0:61616"/>

  7. The SSL Transport

    ssl://hostname:port?transportOptions

  8. The HTTP and HTTPS Transport

    语法: http://host:porthttps://host:port

  9. The Peer Transport

    语法: peer://peer-group/brokerName?brokerOptions

  10. The WebSockets Transport



—— 参考资料 ——


activeMQ官网:

http://activemq.apache.org/using-activemq.html


书籍:《ActiveMQ in Action .pdf》


activeMQ安全配置

  • http://tzj163.blog.163.com/blog/static/10961167920108763148495/


activeMQ通信机制

  • http://www.iteye.com/topic/426226#1090580

  • http://blog.163.com/xiechunhao@126/blog/static/110411383201081371758800/

  • http://netcomm.iteye.com/blog/421656


Activemq的内部机制

  • http://netcomm.iteye.com/blog/topic?page=2&show_full=true


activeMQ性能

  • http://blog.csdn.net/yczz/article/details/6384499

  • http://bsnyderblog.blogspot.com/2010/01/how-to-use-automatic-failover-in.html





以上是关于[Java技术堂-五] JMS-ActiveMQ交流学习的主要内容,如果未能解决你的问题,请参考以下文章

Java魔法堂:String.format详解

JMS-activeMq点对点模式

JMS-ActiveMq-点对点模式

如何在五分钟内部署轻量化K3S平台

第6堂视频课:看到词句就会读-下

工资交老婆八千,给爹妈五千,我留三千应酬,老婆不同意,怎么办?