ActiveMQ源码解析:聊聊消息的可靠传输机制和事务控制
Posted 闲庭细步
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ源码解析:聊聊消息的可靠传输机制和事务控制相关的知识,希望对你有一定的参考价值。
在消息传递的过程中,某些情况下比如网络闪断、丢包等会导致消息永久性丢失,这时消费者是接收不到消息的,这样就会造成数据不一致的问题。那么我们怎么才能保证消息一定能发送给消费者呢?怎么才能避免数据不一致呢?又比如我们发送多条消息,有时候我们期望都发送成功但实际上其中一部分发送成功,另一部分发送失败了,没达到我们的预期效果,那么我们怎么解决这个问题呢?
前一种问题我们通过消息确认机制来解决,它分为几种模式,需要在创建session时指定是否开启事务和确认模式,像下面这样:
<span style="font-size:12px;">ActiveMQSession session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);</span>
然后我们来看在PubSubscribe模式下消息的整个从发送到消费确认的流程来了解消息的确认机制和事务。首先看看producer怎么发送消息的:
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
//检查session状态,如果session已关闭则抛出状态异常
checkClosed();
//检查destination类型,如果不符合要求就转变成ActiveMQDestination
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
//把各种不同的message转换成ActiveMQMessage
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//发送消息到broker中的topic
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
//消息计数
stats.onMessage();
}
我们以ActiveMQSession的send为例再来看看session是怎么发送消息的:
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
//检查session状态如果closed抛出状态异常
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
//竞争锁(互斥信号量),如果一个session的多个producer发送消息这里会保证有序性
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
//告诉broker开始一个新事务,只有session的确认模式是SESSION_TRANSACTED时事务上下网才会开启事务
doStartTransaction();
//从事务上下文中获取事务id
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
//在jms协议头中设置传输模式即消息是否需要持久化
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
//检查producer中的message是否过期
if (!producer.getDisableMessageTimestamp()) {
//message获取时间戳
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
//设置过期时间
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
//设置消息过期时间
message.setJMSExpiration(expiration);
//设置消息优先级
message.setJMSPriority(priority);
//设置消息是非重发的
message.setJMSRedelivered(false);
// transform to our own message format here
//将消息转化成ActiveMQMessage,message针对不同的数据格式有很多种,比如map message,blob message
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
//设置目的地,这里是一个topic
msg.setDestination(destination);
//设置消息id
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
//如果消息是经过转换的,那么原消息更新新的id
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
//设置目的地
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
//清除brokerpath
msg.setBrokerPath(null);
//设置事务id
msg.setTransactionId(txid);
//
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
//设置连接器
msg.setConnection(connection);
//把消息的属性和消息体都设置为只读,防止被修改
msg.onSend();
//生产者id
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
//如果onComplete没设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且(消息非持久化或者连接器是异步发送模式或者存在事务id的情况下)异步发送,否则同步发送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
//异步发送走transport的oneway通道
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
//同步发送走transport的request和asyncrequest通道
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
这样消息就被发送到broker的topic中了,接下来broker中会根据topic下的subscriber的id找出订阅者,并向这些消费者发送消息,消费者接收到消息后会消费消息,我们接下来看看消费者怎么消费消息的。
下面是ActiveMQConsumer和ActiveMQSession中的方法,session没创建一个consumer就可能会重启session线程,session线程的run中会调用message的listener中的onMessage方法
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
//停止session线程
if (wasRunning) {
session.stop();
}
this.messageListener.set(listener);
//session重新分发未消费的message
session.redispatch(this, unconsumedMessages);
//开启session线程
if (wasRunning) {
session.start();
}
} else {
this.messageListener.set(null);
}
}
public void run() {
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
MessageAck earlyAck = null;
//如果消息过期创建新的确认消息
if (message.isExpired()) {
earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
earlyAck.setFirstMessageId(message.getMessageId());
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
}
//如果消息已经过期,或者消息有冲突则发送确认消息重新开始while循环
if (earlyAck != null) {
try {
asyncSendPacket(earlyAck);
} catch (Throwable t) {
LOG.error("error dispatching ack: {} ", earlyAck, t);
connection.onClientInternalException(t);
} finally {
continue;
}
}
//如果是确认模式是CLIENT_ACKNOWLEDGE或者INDIVIDUAL_ACKONWLEDGE则设置空回调函数,这样consumer确认消息后会执行回调函数
if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
@Override
public void execute() throws Exception {
}
});
}
//在发送前调用处理函数
if (deliveryListener != null) {
deliveryListener.beforeDelivery(this, message);
}
//设置delivery id
md.setDeliverySequenceId(getNextDeliveryId());
lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
/*
* The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
* We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
* */
synchronized (redeliveryGuard) {
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
//如果是事务模式则开启事务
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if (ack.getTransactionId() != null) {
//事务状态下添加一个匿名同步器,用于处理同步事务比如回滚
getTransactionContext().addSynchronization(new Synchronization() {
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
@Override
public void beforeEnd() throws Exception {
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
}
LOG.trace("beforeEnd ack {}", ack);
sendAck(ack);
}
@Override
public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here"));
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
if (clearRequestsCounter.get() > clearRequestCount) {
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get
// sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
// Figure out how long we should wait to resend
// this message.
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
/*
* If we are a non blocking delivery then we need to stop the executor to avoid more
* messages being delivered, once the message is redelivered we can restart it.
* */
if (!connection.isNonBlockingRedelivery()) {
LOG.debug("Blocking session until re-delivery...");
executor.stop();
}
connection.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
/*
* wait for the first delivery to be complete, i.e. after delivery has been called.
* */
synchronized (redeliveryGuard) {
/*
* If its non blocking then we can just dispatch in a new session.
* */
if (connection.isNonBlockingRedelivery()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
/*
* If there has been an error thrown during afterDelivery then the
* endpoint will be marked as dead so redelivery will fail (and eventually
* the session marked as stale), in this case we can only call dispatch
* which will create a new session with a new endpoint.
* */
if (afterDeliveryError.get()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
executor.executeFirst(md);
executor.start();
}
}
}
}
}, redeliveryDelay);
}
md.getMessage().onMessageRolledBack();
}
});
}
LOG.trace("{} onMessage({})", this, message.getMessageId());
//触发消息事件监听函数
messageListener.onMessage(message);
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
} finally {
//发送确认消息
if (ack.getTransactionId() == null) {
try {
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onClientInternalException(e);
}
}
}
//触发投递事件监听函数
if (deliveryListener != null) {
try {
deliveryListener.afterDelivery(this, message);
} catch (Throwable t) {
LOG.debug("Unable to call after delivery", t);
afterDeliveryError.set(true);
throw new RuntimeException(t);
}
}
}
/*
* this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
* It also needs to be outside the redelivery guard.
* */
try {
executor.waitForQueueRestart();
} catch (InterruptedException ex) {
connection.onClientInternalException(ex);
}
}
}
总结
消息确认机制
消费者和broker通讯最终实现消息确认,消息确认机制一共有5种,4种jms的和1种activemq补充的,AUTO_ACKNOWLEDGE(自动确认)、CLIENT_ACKNOWLEDGE(客户确认)、DUPS_OK_ACKNOWLEDGE(批量确认)、SESSION_TRANSACTED(事务确认)、INDIVIDUAL_ACKNOWLEDGE(单条确认),consumer在不同的模式下会发不同的命令到broker,broker再根据不同的命令进行操作,如果consumer正常发送ack命令给broker,broker会从topic移除消息并销毁,如果未从消费者接受到确认命令,broker会将消息转移到dlq队列(dead letter queue),并根据delivery mode进行重试或报异常。
消息事务
以上是关于ActiveMQ源码解析:聊聊消息的可靠传输机制和事务控制的主要内容,如果未能解决你的问题,请参考以下文章