ActiveMQ(14):Destination高级特性
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ(14):Destination高级特性相关的知识,希望对你有一定的参考价值。
一、Wildcards
Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。ActiveMQ支持以下三种wildcards:
1:“.” 用于作为路径上名字间的分隔符
2:“*” 用于匹配路径上的任何名字
3:">" 用于递归地匹配任何以这个名字开始的destination
示例,设想你有如下两个destinations
PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价)
PRICE.STOCK.NYSE.SUNW (SUN在纽约证券交易所的股价)
那么:
1:PRICE.> :匹配任何产品的价格变动
2:PRICE.STOCK.> :匹配任何产品的股票价格变动
3:PRICE.STOCK.NASDAQ.* :匹配任何在NASDAQ下面的产品的股票价格变动
4:PRICE.STOCK.*.IBM:匹配任何IBM的产品的股票价格变动
二、组合列队Composite Destinations
组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过composite destinations在一个操作中同时向多个queue发送消息。
2.1 客户端实现的方式
在composite destinations中,多个destination之间采用“,”分割。例如:
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C"); 或 Destination destination = session.createQueue("my-queue,my-queue2");
如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
2.2 在conf/activemq.xml中的broker下配置实现
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <queue physicalName="my-queue" /> <queue physicalName="my-queue2" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
再java代码发送的时候,队列的的名字就用MY.QUEUQ
三、Configure Startup Destinations
如果需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:
<destinations> <queue physicalName="FOO.BAR" /> <topic physicalName="SOME.TOPIC" /> </destinations>
四、Delete Inactive Destinations
一般情况下,ActiveMQ的queue在不使用之后,可以通过web控制台或是JMX方式来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用
的队列(一定时间内为空的队列)并删除掉,回收响应资源。可以如下配置conf/activemq.xml:
<broker schedulePeriodForDestinationPurge="10000"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/> </policyEntries> </policyMap> </destinationPolicy> </broker>
说明:
schedulePeriodForDestinationPurge:设置多长时间检查一次,这里是10秒,默认为0
inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒,默认为60
gcInactiveDestinations: 设置删除掉不活动队列,默认为false
五、Destination Options
队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable
2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0
3:consumer.noLocal :默认false
4:consumer.dispatchAsync :是否异步分发 ,默认true
5:consumer.retroactive:是否为回溯消费者 ,默认false
6:consumer.selector:Jms的Selector,默认null
7:consumer.exclusive:是否为独占消费者 ,默认false
8:consumer.priority:设置消费者的优先级,默认0
使用示例:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10"); consumer = session.createConsumer(queue);
六、虚拟Destinations(Visual Destinations)
6.1 简介
虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。
ActiveMQ支持两种方式:
1:虚拟主题(Virtual Topics)
2:组合Destinations(Composite Destinations)
6.2 为何使用虚拟主题
ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。
queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。
2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高
为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能
6.3 如何使用虚拟主题
1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Orders,代码示例如下:
Topic destination = session.createTopic("VirtualTopic.myTest");
2:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。
例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用
多个consumer消费此queue,则可以实现上面两个功能。
又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多
个消费者共同来承担消费任务。代码示例如下:
Destination destination = session.createQueue("Consumer.A.VirtualTopic.myTest");
注意:将destinationInterceptors注释掉
java代码
public void topic() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("VirtualTopic.myTest"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 设置DeliveryMode.PERSISTENT模式 connection.start(); } public void queue() throws Exception { ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("Consumer.A.VirtualTopic.myTest"); MessageConsumer consumer = session.createConsumer(destination); int i=0; while(i<3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); System.out.println("收到消 息:" + message.getText()); } session.close(); connection.close(); }
3:默认虚拟主题的前缀是 :VirtualTopic.>
自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>
自定义消费虚拟地址可以改,比如下面的配置就把它修改了。
xml配置示例如下:
<broker xmlns="http://activemq.apache.org/schema/core"> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> </broker>
七、镜像Queues(Mirrored Queues)
7.1 简介
ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一
个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。
7.2 使用
ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用
Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省
是“VirtualTopic.Mirror.”。
比如修改后缀的配置示例如下conf/activemq.xml:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useMirroredQueues="true" > ... <destinationInterceptors> <mirroredQueue copyMessage="true" postfix=".qmirror" prefix=""/> </destinationInterceptors> ... </broker>
发送列队的时候,就会自动在topic里也发布
八、Per Destination Policies
ActiveMQ支持多种不同的策略,来单独配置每一个Destination它的属性很多,可以参见官方文档:
http://activemq.apache.org/per-destination-policies.html
本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1920339
以上是关于ActiveMQ(14):Destination高级特性的主要内容,如果未能解决你的问题,请参考以下文章
activemq5.14+zookeeper3.4.9实现高可用