ActiveMQ学习第六篇:Destination的特性

Posted yangk1996

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ学习第六篇:Destination的特性相关的知识,希望对你有一定的参考价值。

Wildcards(通配符)

Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
??ActiveMQ支持以下三种通配符:

  • ".":用于作为路径上名字间的分隔符
  • ">":用于递归的匹配任何以这个名字开始的Destination(目的地)
  • "*":用于作为路径上任何名字。
    ??举例来说,如有以下两个Destination:
    ??PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格)
    ??PRICE.COMPUTER.TMALL.APPLE(苹果电脑在天猫上的价格)
  1. PRICE.> :匹配任何产品的价格变动
  2. PRICE.COMPUTER.> :匹配任何电脑产品的价格变动
  3. PRICE.COMPUTER.JD.*:匹配任何在京东上的电脑的价格变动
  4. PRICE.COMPUTER.*.APPLE:匹配苹果电脑京东或天猫上的价格变动
// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 创建生产者
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 1; i <= 10; i++) {
    TextMessage textMessage = session.createTextMessage(message);
    messageProducer.send("Mac Air价格:"  + i * 1000);
    System.out.println("发送消息 - " + textMessage.getText());
}
session.commit();
// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.>");
// 创建消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener(){
  @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到的消息:" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

通配符中是为消费者服务的。即:通配符只能配置在消费端。

Composite Destinations(组合列队)

组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过compositedestinations在一个操作中同时向多个queue发送消息。

客户端实现的方式:

? 在composite destinations中,多个destination之间采用“,”分割。例如:

        //创建一个队列
       // Destination destination = session.createQueue("test,test1");
        Queue queue = new ActiveMQQueue("test,test1");
        //创建生产者
//        MessageProducer producer = session.createProducer(destination);
        MessageProducer producer = session.createProducer(queue);

如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:

Queue queue = new ActiveMQQueue("test,topic://192.168.100.155::61616");

在xml配置实现的方式:

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
        <!-- 虚拟的queue的名字-->
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
            <!-- 实际发送的名称 -->
          <queue physicalName="my-queue" />
          <queue physicalName="my-queue2" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>

使用filtered destinations,在xml配置如下:

<destinationInterceptors>
       <virtualDestinationInterceptor>
              <virtualDestinations>
                     <compositeQueue name="MY.QUEUE">
                            <forwardTo>
                                 <filteredDestination selector="odd='yes'" queue="FOO"/>
                                 <filteredDestination selector="i = 5" topic="BAR" />
                            </forwardTo>
                     </compositeQueue>
              </virtualDestinations>
       </virtualDestinationInterceptor>
</destinationInterceptors>

避免在network连接到broker,出现重复消息:

<networkConnectors>
<networkConnector uri= "static://(tcp://localhost:61616) " >
<excludedDestinations>
    <queue physicalName="Consumer.*VirtualTopic.> " />
</ excludedDestinations>
</ networkConnector>
</ networkConnectors>

在ActiveMQ启动时候就创建Destination

<broker xmlns="http://activemq.apache.org/schema/core">
       <destinations>
              <queue physicalName="FOO.BAR" />
              <queue physicalName="SOME.TOPIC" />
       </destinations>
</broker>

Delete Inactive Destinations (删除无用的队列)

可以通过web控制台或是JMX方式来删除掉,通过配置文件,自动探测无用的队列并删除掉,回收响应资源,配置如下:
? SchedulePeriodForDestinationPurge:设置多长时间检查一次。
? inactiveTimeoutBeforeGC:设置当destination为空后,多长时间被删除,这里是30s,默认为60
? gcInactiveDestinations:设置删除掉不活动队列,默认为false

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="10000">
       <destinationPolicy>
              <policyMap>
                <policyEntries>
                     <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
                </policyEntries>
              </policyMap>
       </destinationPolicy>
</broker>

Destination options (队列选项)

队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable。
2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0。
3:consumer.noLocal :默认false。
4:consumer.dispatchAsync :是否异步分发 ,默认true。
5:consumer.retroactive:是否为回溯消费者 ,默认false。
6:consumer.selector:Jms的Selector,默认null。
7:consumer.exclusive:是否为独占消费者 ,默认false。
8:consumer.priority:设置消费者的优先级,默认0。

queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20");
consumer = session.createConsumer(queue);

Visual Destinations

前面也说到了两个虚拟主题,虚拟Destinations和组合Destinations
??ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
??(1) 同一应用内consumer端负载均衡的问题:即同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。
技术图片
(2)同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。
??Activemq可以实现虚拟的Topic来解决这两个问题。

使用虚拟主题:

??对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Mobile。示例:

Topic destination = session.createTopic("VirtualTopic.Mobille");

??对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。

??如Consumer.A.VirtualTopic.Mobille,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Mobille说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此Topic,则可以实现上面两个功能。

??又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。代码示例如下:

public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //创建虚拟主题,加前缀VirtualTopic
        Topic topic = session.createTopic("VirtualTopic.TestTopic");
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        for (int i = 0; i < 30; i++) {
            TextMessage textMessage = session.createTextMessage("topic消息===" + i);
            producer.send(textMessage);
        }
        session.commit();
        connection.close();
    }
 public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //创建队列
        Destination destination = session.createQueue("Consumer.A.VirtualTopic.TestTopic");
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("Consumer.A.接收到得消息:" + textMessage.getText());
                    session.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
 public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //创建队列
        Destination destination = session.createQueue("Consumer.B.VirtualTopic.TestTopic");
        final MessageConsumer consumer = session.createConsumer(destination);
        final MessageConsumer messageConsumer = session.createConsumer(destination);
        //模拟多个consumer消费一个queue
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            TextMessage textMessage = (TextMessage) message;
                            try {
                                System.out.println("Consumer.B-->consumer接收到消息:" + textMessage.getText());
                                session.commit();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    messageConsumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            TextMessage textMessage = (TextMessage) message;
                            try {
                                System.out.println("Consumer.B-->messageConsumer接收到消息:" + textMessage.getText());
                                session.commit();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

在接收消息之前,应该先运行一下consumer客户端,将消费者注册到Broker中。
xml配置:
默认虚拟主题的前缀是: VirtualTopic.>
自定义消费虚拟地址默认格式: Consumer.*.VirtualTopic.>
自定义消费虚拟地址可以改,比如下面的配置就把它修改了。

<destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <virtualTopic name=">" prefix="自定义前缀.*." selectorAware="false"/>
            </virtualDestinations>
        </virtualDestinationInterceptor>
</destinationInterceptors>

Mirrored Queules(镜像队列)

??ActiveMQ中每个queue中的消息只能被一一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue来把消息转发到多个queues中。但是为系统中每个queue都进行如此的配置可能会很麻烦。
??ActiveMQ支持Mirrored Queues。 Broker 会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。 为了启用Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirrortopic的前缀, 缺省是:“VirtualTopic. Mirror."
技术图片

<destinationInterceptors>
    <mirroredQueue copyMessage = "true" postfix=".qmirror" prefix=""/>
</destinationInterceptors>

技术图片

这样发送之后会自动存放到一个topic里面。需要定于那个topic就可以监听到消息了。
技术图片
技术图片

以上是关于ActiveMQ学习第六篇:Destination的特性的主要内容,如果未能解决你的问题,请参考以下文章

Python学习第六篇:lambda 表达式

Python学习第六篇:lambda 表达式

我们一起学习WCF 第六篇文件传输

python学习第六篇 文件操作 爬虫

[dart学习]第六篇:流程控制语句

pytorch实战学习第六篇:CIFAR-10分类实现