ActiveMQ高级用法详解
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ高级用法详解相关的知识,希望对你有一定的参考价值。
前言
本篇文章会从ActiveMQ 的连接 以及 提供者发送消息,及递送消息、优先级、负载策略等几个方面去解析ActiveMQ 。通过整个流程然后对ActiveMQ 的特性有个大的理解
ActiveMQ特性详解
生产者产生消息,发送到MQ中,而MQ递送给消费者,MQ中有接收,存储,发送的几个概念。最后交给消费者。 对于mq无论是接收还是发送,基本是采用长链接的方式
连接
这个连接会应用同步方式进行发送,以及包括下面的 默认异步的方式发送数据等等的操作数。
发送消息
// send to 3 queues as one logical operation
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
producer.send(queue, someMessage);
// send to queues and topic one logical operation
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
producer.send(queue, someMessage);
通过配置文件即可以使用虚拟目标,而不采用在代码中使用
<!-- 在 activemq.xml 的broker节点下添加 -->
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<queue physicalName="FOO" />
<topic physicalName="BAR" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<filteredDestination selector="odd = 'yes'" queue="FOO"/>
<filteredDestination selector="i = 5" topic="BAR"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
消息接收
从多个目标消费消息【复合目标】
// 在消费时,并不可以复合对列与主题
new ConsumerThread("tcp://mq.study.com:61616", "queue2,topic://topic2").start();
// 在消费时,可以复合消费多个队列的消息
//new ConsumerThread("tcp://mq.study.com:61616", "queue2,queue3").start();
// 可以订阅多个主题
//new ConsumerThread("tcp://mq.study.com:61616", "topic1,topic2").start();
镜像队列
<broker ......., useMirroredQueues="true">
....
<destinationInterceptors>
<!--prefix指定主题的前缀,默认是 VirtualTopic.Mirror. postfix 指定主题的后缀 默认无 -->
<mirroredQueue copyMessage="true" postfix=".qmirror" prefix=""/>
</destinationInterceptors>
....
</broker>
发消息时,只能被处理一次。
主题-集群负载均衡
ActiveMQ默认的Prefetch策略:
- persistent queues (default value: 1000 )
- non-persistent queues (default value: 1000 )
- persistent topics (default value: 100 )
- non-persistent topics (default value: Short.MAX_VALUE - 1 )
只有达到某个消费者1000个时,才会发送给其他的。这就会导致的问题。
并且设置不同的策略 在官方文档中都有的
如果某消息的消费者是慢消费者,它处理一个消息要花费很长时间,因此我们往往会采用集群 负载均衡的方式来快速处理消息,此时默认的Prefetch会有影响,会造成负载不均,消息得不到及时处理。
tcp://localhost:61616?jms.prefetchPolicy.all=50
要更改队列使用者的预取限制,请仅按如下方式配置连接URI:
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
还可以使用以下工具对每个消费者进行配置:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
Message Dispatch 派发
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
利用优先级去指定消费者。
优先级值的范围为:0到127。最高优先级是127。默认优先级为0。代理根据队列的使用者的优先级对其进行排序,并将消息发送给最高优先级的消费者优先。一旦某个特定使用者的预取缓冲区已满,代理程序将将开始向具有下一个最低优先级且其预取缓冲区未满。
优先发给优先级最高的
严格顺序处理消息
队列中的消息必须按队列中的顺序一个接一个串行处理。 同步只有上一个处理完了在走下面个。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
Broker delivery message 递送消息
-
不进行流量控制
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false"/>
</policyEntries>
</policyMap>
</destinationPolicy>
- 不使用临时文件存储
<policyEntry queue=">"
producerFlowControl="true" memoryLimit="1mb">
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
- 空间不够时,不阻塞而是发送异常给生产者
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
-
空间不够时,阻塞等待一定时间后还是没有空间再发送异常给生产者
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
-
通过系统用量来限流量
<systemUsage>
<systemUsage>
<memoryUsage> <memoryUsage limit="64 mb" /> <!-- 也可以使用百分比 --> <!--
<memoryUsage percentOfJvmHeap="70" /> -->
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage> <tempUsage limit="10 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
可以为非持久性消息设置内存限制,为持久性消息设置磁盘存储限制消息和临时消息的总使用量,代理将在其减速之前使用制片人。使用上面显示的默认设置,代理将阻止send()调用,直到消息被消耗,代理上的空间变为可用。将显示默认值在上面,您可能需要为您的环境增加这些值。
默认的配置,包括整体的实例用量,这里把不用设的很大,因为消息量,后端应该都会处理。
以上是关于ActiveMQ高级用法详解的主要内容,如果未能解决你的问题,请参考以下文章
TestNg依赖高级用法之强制依赖与顺序依赖------TestNg依赖详解