高并发分布式消息中间件技术ActiveMQ事务

Posted Java技术汇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发分布式消息中间件技术ActiveMQ事务相关的知识,希望对你有一定的参考价值。

当前浏览器不支持播放音乐或语音,请在微信或其他浏览器中播放 高并发分布式消息中间件技术ActiveMQ事务

ActiveMQ有支持两种事务



JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)

XA Transactions - where the XASession acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.

在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。

如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。具体删除transaction ID的地方是在

org.apache.activemq.util.BrokerSupport的doResend,将transaction ID保存在了originalTransactionID中,删除了transaction ID

事务

jms中事务分为生产者和消费者两块,消息的生产和消费不能包含在同一个事务中。

生产者

在事务状态下进行发送操作,消息并未真正投递到中间件。而只有进行session.commit操作之后,消息才会发送到中间件,再转发到适当的消费者进行处理。如果是调用rollback操作,则表明,当前事务期间内所发送的消息都取消掉。

在支持事务的session中,producer发送message时在message中带有transactionID。broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。

消费者

在Spring整合JMS的应用中,我们要进行本地的事务管理,只需要指定对应的监听容器的sessionTransacted属性为true。对于SessionAwareMessageListener,在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。

<bean id="jmsContainer"

class&#x3D;"org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name&#x3D;"connectionFactory" ref&#x3D;"connectionFactory" />

<property name&#x3D;"destination" ref&#x3D;"queueDestination" />

<property name&#x3D;"messageListener" ref&#x3D;"consumerMessageListener" />

<property name&#x3D;"sessionTransacted" value&#x3D;"true"/>

</bean>

jta事务:

如果想要接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。

<bean id&#x3D;"listenerContainer" class&#x3D;"org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name&#x3D;"connectionFactory" ref&#x3D;"connectionFactory" />

<property name&#x3D;"messageListener" ref&#x3D;"jmsQueueReceiver" />

<property name&#x3D;"destination" ref&#x3D;"queueDestination" />

<property name&#x3D;"sessionTransacted" value&#x3D;"true"/>

<property name&#x3D;"transactionManager" ref&#x3D;"jtaTransactionManager"/>

</bean>

<bean id&#x3D;"jtaTransactionManager" class&#x3D;"org.springframework.transaction.jta.JtaTransactionManager"/>

当指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。

持久化

默认持久化到文件中:

打开安装目录下的配置文件,注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器。以日志形式存储消息,消息索引以B-Tree结构存储。在D:\ActiveMQ\apache-activemq\conf\activemq.xml中会发现默认的配置项:

<persistenceAdapter>

<kahaDB directory&#x3D;"$/kahadb"/>

</persistenceAdapter>

消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。

消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

如需持久化到数据库中:

首先需要把mysql的驱动放到ActiveMQ的Lib目录下,例如:mysql-connector-java-5.0.4-bin.jar,在conf/acticvemq.xml中更改persistenceAdapter节点配置并且引用定义的mysql-ds数据源。

<!--

Configure message persistence for the broker. The default persistence

mechanism is the KahaDB store (identified by the kahaDB tag).

For more information, see:

http://activemq.apache.org/persistence.html

<kahaDB directory&#x3D;"$/kahadb"/>

-->

<persistenceAdapter>

<jdbcPersistenceAdapter dataSource&#x3D;"#mysql-ds" createTablesOnStartup&#x3D;"false" />

</persistenceAdapter>

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

在conf/acticvemq.xml中定义mysql-ds,如下

<bean id&#x3D;"mysql-ds" class&#x3D;"org.apache.commons.dbcp.BasicDataSource" destroy-method&#x3D;"close">

<property name&#x3D;"driverClassName" value&#x3D;"com.mysql.jdbc.Driver"/>

<property name&#x3D;"url" value&#x3D;"jdbc:mysql://localhost/activemq?relaxAutoCommit&#x3D;true"/>

<property name&#x3D;"username" value&#x3D;"root"/>

<property name&#x3D;"password" value&#x3D;""/>

<property name&#x3D;"maxActive" value&#x3D;"200"/>

<property name&#x3D;"poolPreparedStatements" value&#x3D;"true"/>

</bean>

然后重新启动消息队列,你会发现多了3张表activemq_acks,activemq_lock,activemq_msgs。

activemq_msgs用于存储消息,然后启动消费者,发现Mysql中已经没有这条消息了。

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

activemq_lock在集群环境中才有用

PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息),默认为持久消息。持久化的消息在MQ服务器宕机之后,消息不会丢失,在重启服务的时候,消息将恢复。

<bean id&#x3D;"jmsTemplate" class&#x3D;"org.springframework.jms.core.JmsTemplate">

<property name&#x3D;"connectionFactory" ref&#x3D;"cachingConnectionFactory"></property>

<property name&#x3D;"defaultDestination" ref&#x3D;"dest" />

<property name&#x3D;"messageConverter" ref&#x3D;"messageConverter" />

<property name&#x3D;"pubSubDomain" value&#x3D;"false" />

<property name&#x3D;"explicitQosEnabled" value&#x3D;"true" />

<!-- 发送模式 DeliveryMode.NON_PERSISTENT&#x3D;1:非持久 ; DeliveryMode.PERSISTENT&#x3D;2:持久-->

<property name&#x3D;"deliveryMode" value&#x3D;"1" />

</bean>

prefetch:

ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。

假设有三个消费者,接收从1到99,共99条消息:

consumer A:1,4,7...

consumer B:2,5,8...

consumer C:3,6,9...

按照默认分配策略,将会把消息如上预分配。

这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除。如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。

假设一种情况:某个consumer C性能较差,处理信息速度很慢。会导致consumer C任有消息积压,但consumer A, consumer B已经空闲。

解决方案:将consumer C 的 prefetch设为1,每次处理1条消息,处理完再去取。

prefetchPolicy.setQueuePrefetch(1)

connectionFactory &#x3D; new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);//实例化连接工厂

connectionFactory.setPrefetchPolicy(prefetchPolicy);

TimeToLive:

表示一个消息的有效期。只有在这个有效期内,消息消费者才可以消费这个消息。默认值为0,表示消息永不过期。

如果使用TTL来判定消息的过期,那么就首先需要确保Producer、broker两者的系统时间要尽可能的一致,Consumer也尽可能的和broker的时间保持一致。Broker将会在接收Producer消息时,以及将消息发送给Consumer之前都会检测消息是否过期,判断过期的方法也就是根据JMSExpiration和当前时间戳比较。

可以通过下面的方式设置

producer.setTimeToLive(3600000); //有效期1小时 (1000毫秒 * 60秒 * 60分)

可以在消息发送时,为当前消息设定ttl

messageProducer.send(Message message, int deliveryMode, int priority, long timeToLive)

如果消息过期,将会把消息发送到DLQ中,此消息不会被Consumer消费。如果broker端对DLQ使用Discard策略或者Broker没有开启DLQ相关策略,这些过期的消息可能将不复存在。在conf/activemq.xml中:

<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry queue&#x3D;">" topic&#x3D;">">

<deadLetterStrategy>

<sharedDeadLetterStrategy processExpired&#x3D;"false" />

</deadLetterStrategy>

<!-- discard all -->

<!--

<discardingDeadLetterStrategy />

-->

</policyEntry>

</policyEntries>

</policyMap>

</destinationPolicy>

Priority:

我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。

JMS标准中约定priority可以为09的数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持09。但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: &#x3D;4,HIGH: > 4)。在broker端,默认是不支持priority排序的,我们需要手动开启。在conf/activemq.xml中::

<policyEntry queue&#x3D;">" prioritizedMessages&#x3D;"true"/>

设置message的优先级

TextMessage message &#x3D; session.createTextMessage("ActiveMQ 发送消息" +i);//创建一条文本消息

message.setJMSPriority(9);

failover:

如果集群中的某一台消息服务器宕机,与该台消息服务器相连接的生产者和消费者需要能够自动连接到其他正常工作的消息服务器。对此ActiveMQ提供了一种叫做失效转移(也叫故障转移,FailOver)的策略。失效转移提供了在传输层上重新连接到其他任何传输器的功能。

只需要在uri中配置,语法如下

failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN

例子

failover:(tcp://primary:61616,tcp://secondary:61616)?randomize&#x3D;false

failover:(tcp://localhost:61616,tcp://remotehost:61616)?randomize&#x3D;false&initialReconnectDelay&#x3D;100

Java例子

ActiveMQConnectionFactory connectionFactory &#x3D; new ActiveMQConnectionFactory(

"failover:(

tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration&#x3D;0,

tcp://192.168.0.87:61617?wireFormat.maxInactivityDuration&#x3D;0

)");

Session session &#x3D; connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

transportOptions有多种参数可以选择,如下

initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之前等待的时间。

maxReconnectDelay:默认30000,单位毫秒,表示两次重连之间的最大时间间隔。

useExponentialBackOff:默认为true,表示重连时是否加入避让指数来避免高并发。

reconnectDelayExponent:默认为2.0,重连时使用的避让指数。

maxReconnectAttempts:5.6版本之前默认为-1, 5.6版本及其以后,默认为0。 0表示重连的次数无限,配置大于0可以指定最大重连次数。

randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略。如果为true的话,有可能生产者连接的是第一个,而消费者连接的是第二个,造成一个服务器上只有生产者,一个服务器上只有消费者。

timeout:默认为-1,单位毫秒,是否允许在重连过程中设置超时时间来中断的正在阻塞的发送操作。-1表示不允许,其他表示超时时间。


高并发分布式消息中间件技术ActiveMQ事务



Java技术汇   加关注
长按,识别二维码,加关注





以上是关于高并发分布式消息中间件技术ActiveMQ事务的主要内容,如果未能解决你的问题,请参考以下文章

大型互联网高并发解决方案之消息中间件技术-activeMQ详解

关于消息中间件ActiveMQ的企业级应用

大型分布式消息队列处理高并发

rocketmq(Activemq的缺点以及rocketmq解释)

高并发中间件3-ActiveMQ

高并发中间件4-ActiveMQ支持的协议