架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)

Posted 说好不能打脸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)相关的知识,希望对你有一定的参考价值。

接上文《架构设计:系统间通信(21)——ActiveMQ的安装与使用

3、ActiveMQ性能优化思路

上篇文章中的两节内容,主要介绍消息中间件ActiveMQ的安装和基本使用。从上篇文章给出的安装配置和示例代码来看,我们既没有修改ActivieMQ服务节点的任何配置,也没有采用任何的集群方案。这种情况只适合各位读者熟悉ActiveMQ的工作原理和基本操作,但是如果要将ActivieMQ应用在生产环境下,上文中介绍的运行方式远远没有挖掘出它的潜在性能。

这里写图片描述

根据这个系列文章所陈述的中心思想,系统的性能层次包括:代码级性能、规则性能、存储性能、网络性能,以及多节点协同方法(集群方案),所以我们优化ActiveMQ的中心思路也是这样的:首先优化ActiveMQ单个节点的性能,然后在配置ActiveMQ的集群。下面我们就按照这个思路,一步步介绍和ActiveMQ性能有关的那些事。

在默认情况下ActiveMQ的网络信息传递方式基于网络IO模型中的BIO方式。那么为了提高ActiveMQ单节点的工作性能,我们首先应该为每一个独立的MQ服务节点配置更高效的网络IO模型(关于网络IO模型的详细讲解,请参考本系列的另外几篇文章:《架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇》、
架构设计:系统间通信(4)——IO通信模型和JAVA实践 中篇》、《架构设计:系统间通信(5)——IO通信模型和JAVA实践 下篇》,这个思想也是整个系列文章的核心思想之一)。

另外我们还需要为ActiveMQ考虑一种存储方案,让它能高效的完成“持久化”消息的存储操作(也包括对“非持久化”消息的临时存储)**。虽然关于存储部分的知识和示例 在我另外一个系列的专题文章——系统存储(很快就会出品)才会讲到,但为了讲清楚消息中间件软件的性能优化,所以还是会讲到存储方案的选择。

另外,我们还知道了,使用集群方案能够增加整个软件的性能和稳定性,所以在完成单节点优化以后,我们还需要提供某种集群方案将多个ActiveMQ组合起来,让它们协同工作(一定是协同工作,单纯的安装多个ActiveMQ节点而不进行协同,是没法提高性能和稳定性的)。

Apache ActiveMQ是开源软件,您可以在ActiveMQ的官网下载到它的源代码,并进行代码级别性能的改造,但很明显这并不是一个能获得高效费比的方式。所以笔者不建议您和您的团队更改ActiveMQ代码实现(学习一下别人的代码实现还是可以的)。

4、ActiveMQ中的网络IO

4-1、基本连接配置

在上文中,我们提到了ActiveMQ支持多种消息协议,包括AMQP协议、MQTT协议、Openwire协议、Stomp协议等。在ActiveMQ的官方网站上,列出了目前ActiveMQ中支持的所有消息协议,它们是:AMQP、MQTT、OpenWire、REST、Stomp、XMPP;

不同的协议需要设置不同的网络监听端口,这个相关设置在ActiveMQ安装目录的./conf/conf/activemq.xml主配置文件中。主配置文件采用XML格式进行描述,其中的“transportConnectors”标记描述了各种协议的网络监听端口,示例如下:

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

以上配置了openwire协议的接入端口号为本机所有IP设备的61616(0.0.0.0代表本机所有IP设备);配置amqp协议的接入端口号为本机所有IP设备的5672;配置stomp协议的接入端口号为本机所有IP设备的61613,等等。这里注意以下几个事实:

  • 每一个“transportConnector”标记的name属性和uri属性都必须填写,name属性的值可以随便填写,它将作为一个Connector元素,显示在ActiveMQ管理界面的Connections栏目中;

  • 每一个“transportConnector”标记的uri元素,都有固定写法:uri头是指定的协议名称,例如amqp、mqtt、stomp等。然后是HOST/IP域名,指定端口监听所在的路由信息;请不要使用localhost或者127.0.0.1这样的回环地址,否则无法通过网络连接到ActiveMQ;接下来是端口信息,指定的端口不能重复,否则会产生冲突。

  • URI参数部分,每一种协议都有一些特定的参数,读者可参考ActiveMQ官网中,关于“协议”部分的介绍:http://activemq.apache.org/protocols.html。但是有些参数却是各种协议都可共用的,例如以上实例中使用的“maximumConnections”属性,代表这个端口支持的最大连接数量;”wireFormat.maxFrameSize”属相代表支持协议的“一个完整消息”的最大数据量(单位为byte);您可以在ActiveMQ官网中对wire formats的参数描述中,找到这些默认属性:http://activemq.apache.org/configuring-wire-formats.html

Any transport which involves marshalling messages onto some kind of network transport like TCP or UDP will typically use the OpenWire format. This is configurable to customize how things appear on the wire.

4-2、特别说明

  • 在上文给出的配置信息中,您可以发现我们在描述各种消息协议时,URI描述信息的头部都的是采用协议名称:例如,描述amqp协议的监听端口时,采用的URI描述格式为“amqp://……”;描述Stomp协议的监听端口时,采用的URI描述格式为“stomp://……”。唯独在进行openwire协议描述时,URI头却采用的“tcp://…..”。这是因为ActiveMQ中默认的消息协议就是openwire

OpenWire is binary protocol designed for working with Message Oriented Middleware. It is the native wire format of ActiveMQ.

OpenWire is our cross language Wire Protocol to allow native access to ActiveMQ from a number of different languages and platforms. The Java OpenWire transport is the default transport in ActiveMQ 4.x or later.

  • 上文中我们还讲到,ActiveMQ完整支持AMQP协议。但是读者会发现ActiveMQ中并没有存在Exchange这样的结构。这是怎么回事呢?实际上在国际标准组织 (ISO) 和国际电工委员会 (IEC) 制定的AMQP Version 1.0 规范文档中(《OASIS Advanced Message Queueing Protocol
    (AMQP) Version 1.0》),并没有说AMQP 消息必须经过Exchange规则才能够到达队列,也没有规定Exchange 必须要实现某种规则的路由。所以在支持AMQP协议时,是否需要有Exchange这样的路由处理规则,完全取决于AMQP的消息中间件软件厂商自己的决定。下面一段代码是使用JMS API连接ActiveMQ的AMQP端口,发送AMQP消息的示例:
package mq.test.amqp;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;

public class JMSProducer {
    public static void main(String[] args) throws Throwable {
        // 注意,JMS-AMQP使用的是Apache QPID的实现。如果您需要运行这段代码,请导入QPID的客户端
        /*
         * <dependency>
         *  <groupId>org.apache.qpid</groupId>
         *  <artifactId>qpid-amqp-1-0-client-jms</artifactId>
         *  <version>0.32</version>
         * </dependency>
         * */
        ConnectionFactoryImpl factory = ConnectionFactoryImpl.createFromURL("amqp://192.168.61.138:5672");
        Connection connection = factory.createQueueConnection();
        connection.start();

        // 建立会话,连接到叫做/test的Queue上
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = session.createQueue("/test");
        MessageProducer messageProducer = session.createProducer(queue);

        // 开始发送消息
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("23456656457567456");
        messageProducer.send(outMessage);

        //关闭
        messageProducer.close();
        connection.close();
    }
}

4-3、网络配置优化

那么各位读者您是否觉得上一小节那样的连接端口配置太过冗长,不好进行管理?确实是这样,并且实际工作中我们也只会使用几种固定的协议。所以ActiveMQ在Version 5.13.0+ 版本后,将OpenWire, STOMP, AMQP, MQTT这四种主要协议的端口监听进行了合并,并使用auto关键字进行表示。也就是说,ActiveMQ将监听这一个端口的消息状态,并自动匹配合适的协议格式。配置如下:

<transportConnectors>
    <transportConnector name="auto" uri="auto://0.0.0.0:61617?maximumConnections=1000" />
</transportConnectors>

以上的URI配置信息中,可以使用所有通用的Connection Configuration、Wire Formats Configuring、Server side options和TCP Transport Configuration配置项。但是这种优化只是让ActiveMQ的连接管理变得简洁了,并没有提升单个节点的处理性能。

如果您不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络IO模型。所以为了首先提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型,如下所示:

<transportConnectors>  
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000"/>  
</transportConnectors> 

请注意,URI格式头以”nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持Openwire协议。那么我们怎么既让这个端口支持NIO网络IO模型,又让它支持多个协议呢?ActiveMQ的服务端设置,允许开发人员使用“+”符号来为端口设置多种特性,如下:

<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.transformer=jms"/>
// 表示这个端口使用NIO模型支持Stomp协议

<transportConnector name="amqp+ssl" uri="amqp+ssl://localhost:5671"/>
// 表示这个端口支持amqp和ssl密文传输

所以如果我们既需要某一个端口支持NIO网络IO模型,又需要它支持多个协议,那么可以进行如下的配置:

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000" />

另外,如果是为了生产环境进行的配置,那么您至少应该还要配置这个端口支持的最大连接数量、设置每一条消息的最大传输值、设置NIO使用的线程池最大工作线程数量(当然您已经知道了这些设置的文档所在位置,所以您可以根据自己的情况进行设置属性的增减):

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />

以下附图是改变网络连接设置后,ActiveMQ管理控制台中Connections页面显示的内容。注意ws协议的端口是额外保留的配置——因为auto模式中的协议不支持ws:

这里写图片描述

5、JMS规范中的几个基本概念

由于ActiveMQ是JMS规范的完整实现,所以为了讲清楚ActiveMQ是如何进行存储和调度的,就需要首先说明JMS中和存储、调度有关的几个概念。它们是:消息收发模式(订阅-发布和负载均衡模式)、消息存储模式(持久化消息和非持久化消息)和订阅模型(持续订阅和非持续订阅)。

5-1、订阅发布模式和负载均衡模式

在上篇文章中我们已经详细过订阅-发布模式和负载均衡模式(http://blog.csdn.net/yinwenjie/article/details/50916518#t5)。在ActiveMQ的官方文档中的描述,他们的英文名称分别是Topics和Queue。这两种消息“发送-接受”模式,都是JMS规范中的标准模式。为了使各位读者在后续的阅读中,不会和JMS中的其他概念不造成混淆,我在后文章中都将使用规范的名称进行称呼。

  • 订阅-发布模式:

这里写图片描述

在“订阅-发布”模式下,消息会被复制多份,分别发送给所有“订阅”者。实际上在后文的描述中您将看到,这个复制的过程实际上没有您想象的简单。

  • 负载均衡模式:

这里写图片描述

在“负载均衡”模式下,一条消息将会发送给一个消息消费者,如果当前Queue没有消息消费者,消息将进行存储。同样通过后文的介绍,您会发现这其中的过程也并不简单。

5-2、持久化消息和非持久化消息

JMS中对非持久化消息和非持久化消息的称呼分别是:NON_PERSISTENT Message和PERSISTENT Meaage。它们指的是消息在任何一种“发送-接受”模式下(“订阅-发布”模式和“负载均衡模式”),是否进行持久化存储。

NON_PERSISTENT Message只存储在JMS服务节点的内存区域,不会存储在某种持久化介质上(后面我们要介绍到AcitveMQ可支持的持久化介质有:KahaBD、AMQ和关系型数据)。在极限情况下,JMS服务节点的内存区域不够使用了,也只会采用某种辅助方案进行转存(例如ActiveMQ会使用磁盘上的一个“临时存储区域”进行暂存)。一旦JMS服务节点宕机了,这些NON_PERSISTENT Message就会丢失

JMS中对PERSISTENT Meaage的定义是:这些消息不受JMS服务端异常状态的影响,JMS服务端会使用某种持久化存储方案保存这些消息,直到JMS服务端认为这些PERSISTENT Meaage被消费端成功处理。例如ActiveMQ中可以选择的持久化存储方案就包括:KahaDB、AMQ和关系型数据库。

在JMS标准API中,使用setDeliveryMode标记消息发送者是发送的PERSISTENT Meaage还是NON_PERSISTENT Message。示例如下:

......
for(int index = 0 ; index < 10 ; index++) {
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText("这是发送的消息内容:" + index);
    if(index % 2 == 0) {
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    } else {
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
    }
    sender.send(outMessage);
}
......

那么当JMS服务节点重启后(注意不是producer重启),以上代码中发送的10条消息只有其中5条消息能够保存下来

5-3、持续订阅和非持续订阅

持续订阅和非持续订阅,是针对“订阅-发布”模式的细分处理策略,在JMS规范中的标准称呼是:Durable-Subscribers和Non-Durable Subscribers。

Durable-Subscribers是指在“订阅-发布”模式下,即使标记为Durable-Subscribers的订阅者下线了(可能是因为订阅者宕机,也可能是因为这个订阅者故意下线),“订阅-发布”模式的Topic队列也要保存这些消息(视消息不同的持久化策略影响,保存机制不一样),直到下次这个被标记为Durable-Subscribers的订阅者重新上线,并正确处理这条消息为止。换句话说,标记为Durable-Subscribers的订阅者是否能获得某条消息,和它是否曾经下线没有任何关系

Non-Durable Subscribers是指在“订阅-发布”模式下,“订阅-发布”模式的Topic队列不用为这些已经下线的订阅者保留消息。当后者将消息按照既定的广播规则发送给当前在线的订阅者后,消息就可以被标记为“处理完成”。

==================接下文

以上是关于架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)的主要内容,如果未能解决你的问题,请参考以下文章

架构设计:系统间通信(20)——MQ:消息协议(下)

架构设计:系统间通信(31)——其他消息中间件及场景应用(下1)

架构设计:系统间通信——概述从“聊天”开始上篇

架构设计:系统间通信(19)——MQ:消息协议(上)

架构设计:系统间通信(32)——其他消息中间件及场景应用(下2)

架构设计:系统间通信——概述从“聊天”开始上篇