Nacos服务心跳和健康检查源码介绍,面试官:问你一个

Posted 程序员超时空

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos服务心跳和健康检查源码介绍,面试官:问你一个相关的知识,希望对你有一定的参考价值。

		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
		con = txObject.getConnectionHolder().getConnection();

		//从数据库连接中获取隔离级别
		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
		txObject.setPreviousIsolationLevel(previousIsolationLevel);

		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
		// so we don't want to do it unnecessarily (for example if we've explicitly
		// configured the connection pool to set it already).
		if (con.getAutoCommit()) {
			txObject.setMustRestoreAutoCommit(true);
			if (logger.isDebugEnabled()) {
				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
			}
			//关闭连接的自动提交,其实这步就是开启了事务
			con.setAutoCommit(false);
		}

		//设置只读事务 从这一点设置的时间点开始(时间点a)到这个事务结束的过程中,其他事务所提交的数据,该事务将看不见!
		//设置只读事务就是告诉数据库,我这个事务内没有新增,修改,删除操作只有查询操作,不需要数据库锁等操作,减少数据库压力
		prepareTransactionalConnection(con, definition);

		//自动提交关闭了,就说明已经开启事务了,事务是活动的
		txObject.getConnectionHolder().setTransactionActive(true);

		int timeout = determineTimeout(definition);
		if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
			txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
		}

		// Bind the connection holder to the thread.
		if (txObject.isNewConnectionHolder()) {
			//如果是新创建的事务,则建立当前线程和数据库连接的关系
			TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
		}
	}

	catch (Throwable ex) {
		if (txObject.isNewConnectionHolder()) {
			DataSourceUtils.releaseConnection(con, obtainDataSource());
			txObject.setConnectionHolder(null, false);
		}
		throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
	}
}

这个方法里面主要做了六件事:

*   首先从连接池获取连接并保存到DataSourceTransactionObject对象中。

*   关闭数据库的自动提交,也就是开启事务。

*   获取数据库的隔离级别。

*   根据属性设置该事务是否为只读事务。

*   将该事务标识为活动事务(transactionActive=true)。

*   将ConnectionHolder对象与当前线程绑定。

完成之后通过prepareSynchronization将事务的属性和状态设置到TransactionSynchronizationManager对象中进行管理。最后返回到createTransactionIfNecessary方法中创建TransactionInfo对象与当前线程绑定并返回。 通过以上的步骤就开启了事务,接下来就是通过proceedWithInvocation调用其它切面,这里我们先假设没有其它切面了,那么就是直接调用到A类的addA方法,在这个方法中又调用了B类的addB方法,那么肯定也是调用到代理类的方法,因此又会进入到createTransactionIfNecessary方法中。但这次进来通过isExistingTransaction判断是存在事务的,因此会进入到handleExistingTransaction方法:

private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {

	//不允许有事务,直接异常
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
		throw new IllegalTransactionStateException(
				"Existing transaction found for transaction marked with propagation 'never'");
	}

	//以非事务方式执行操作,如果当前存在事务,就把当前事务挂起
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction");
		}
		//挂起当前事务
		Object suspendedResources = suspend(transaction);
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		//修改事务状态信息,把事务的一些信息存储到当前线程中,ThreadLocal中
		return prepareTransactionStatus(
				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
	}

	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction, creating new transaction with name [" +
					definition.getName() + "]");
		}
		//挂起
		SuspendedResourcesHolder suspendedResources = suspend(transaction);
		try {
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
			doBegin(transaction, definition);
			prepareSynchronization(status, definition);
			return status;
		}
		catch (RuntimeException | Error beginEx) {
			resumeAfterBeginException(transaction, suspendedResources, beginEx);
			throw beginEx;
		}
	}

	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		if (!isNestedTransactionAllowed()) {
			throw new NestedTransactionNotSupportedException(
					"Transaction manager does not allow nested transactions by default - " +
					"specify 'nestedTransactionAllowed' property with value 'true'");
		}
		if (debugEnabled) {
			logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
		}
		//默认是可以嵌套事务的
		if (useSavepointForNestedTransaction()) {
			// Create savepoint within existing Spring-managed transaction,
			// through the SavepointManager API implemented by TransactionStatus.
			// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
			DefaultTransactionStatus status =
					prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
			//创建回滚点
			status.createAndHoldSavepoint();
			return status;
		}
		else {
			// Nested transaction through nested begin and commit/rollback calls.
			// Usually only for JTA: Spring synchronization might get activated here
			// in case of a pre-existing JTA transaction.
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, null);
			doBegin(transaction, definition);
			prepareSynchronization(status, definition);
			return status;
		}
	}

	// 省略

	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

这里面也是对每个传播属性的判断,先看PROPAGATION_REQUIRES_NEW的处理,因为该属性要求每次调用都开启一个新的事务,所以首先会将当前事务挂起,怎么挂起呢?

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
//第一次进来,肯定为null的
if (transaction != null) {
//吧connectionHolder设置为空
suspendedResources = doSuspend(transaction);
}

			//做数据还原操作
			String name = TransactionSynchronizationManager.getCurrentTransactionName();
			TransactionSynchronizationManager.setCurrentTransactionName(null);
			boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
			Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
			boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
			TransactionSynchronizationManager.setActualTransactionActive(false);
			return new SuspendedResourcesHolder(
					suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
		}
		catch (RuntimeException | Error ex) {
			// doSuspend failed - original transaction is still active...
			doResumeSynchronization(suspendedSynchronizations);
			throw ex;
		}
	}
	else if (transaction != null) {
		// Transaction active but no synchronization active.
		Object suspendedResources = doSuspend(transaction);
		return new SuspendedResourcesHolder(suspendedResources);
	}
	else {
		// Neither transaction nor synchronization active.
		return null;
	}
}

protected Object doSuspend(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	txObject.setConnectionHolder(null);
	//解除绑定关系,
	return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

这里明显是进入第一个if并且会调用到doSuspend方法,整体来说挂起事务很简单:首先将DataSourceTransactionObject的ConnectionHolder设置为空并解除与当前线程的绑定,之后将解除绑定的ConnectionHolder和其它属性(事务名称、隔离级别、只读属性)通通封装到SuspendedResourcesHolder对象,并将当前事务的活动状态设置为false。挂起事务之后又通过newTransactionStatus创建了一个新的事务状态并调用doBegin开启事务,这里不再重复分析。 接着来看PROPAGATION_NESTED:

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
“specify ‘nestedTransactionAllowed’ property with value ‘true’”);
}
if (debugEnabled) {
logger.debug(“Creating nested transaction with name [” + definition.getName() + “]”);
}
//默认是可以嵌套事务的
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//创建回滚点
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}


这里面可以看到如果允许嵌套事务,就会创建一个DefaultTransactionStatus对象(注意newTransaction是false,表明不是一个新事务)和回滚点;如果不允许嵌套,就会创建新事务并开启。 当上面的判断都不满足时,也就是传播属性为默认PROPAGATION_REQUIRED时,则只是创建了一个newTransaction为false的DefaultTransactionStatus返回。 完成之后又是调用proceedWithInvocation,那么就是执行B类的addB方法,假如没有发生异常,那么就会回到切面调用commitTransactionAfterReturning提交addB的事务:

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

public final void commit(TransactionStatus status) throws TransactionException {

	processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
	try {
		boolean beforeCompletionInvoked = false;

		try {
			boolean unexpectedRollback = false;
			prepareForCommit(status);
			triggerBeforeCommit(status);
			triggerBeforeCompletion(status);
			beforeCompletionInvoked = true;

			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Releasing transaction savepoint");
				}
				// 如果是nested,没有提交,只是将savepoint清除掉了
				unexpectedRollback = status.isGlobalRollbackOnly();
				status.releaseHeldSavepoint();
			}
			//如果都是PROPAGATION_REQUIRED,最外层的才会走进来统一提交,如果是PROPAGATION_REQUIRES_NEW,每一个事务都会进来
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction commit");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				doCommit(status);
			}
			else if (isFailEarlyOnGlobalRollbackOnly()) {
				unexpectedRollback = status.isGlobalRollbackOnly();
			}
		}
	}
	finally {
		cleanupAfterCompletion(status);
	}
}

主要逻辑在processCommit方法中。如果存在回滚点,可以看到并没有提交事务,只是将当前事务的回滚点清除了;而如果是新事务,就会调用doCommit提交事务,也就是只有PROPAGATION_REQUIRED属性下的最外层事务和PROPAGATION_REQUIRES_NEW属性下的事务能提交。事务提交完成后会调用cleanupAfterCompletion清除当前事务的状态,如果有挂起的事务还会通过resume恢复挂起的事务(将解绑的连接和当前线程绑定以及将之前保存的事务状态重新设置回去)。当前事务正常提交后,那么就会轮到addA方法提交,处理逻辑同上,不再赘述。 如果调用addB发生异常,就会通过completeTransactionAfterThrowing进行回滚:

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace(“Completing transaction for [” + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
}
}
}

public final void rollback(TransactionStatus status) throws TransactionException {
	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	processRollback(defStatus, false);
}

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
	try {
		boolean unexpectedRollback = unexpected;

		try {
			triggerBeforeCompletion(status);

			//按照嵌套事务按照回滚点回滚
			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Rolling back transaction to savepoint");
				}
				status.rollbackToHeldSavepoint();
			}
			//都为PROPAGATION_REQUIRED最外层事务统一回滚
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction rollback");
				}
				doRollback(status);
			}
			else {
				// Participating in larger transaction
				if (status.hasTransaction()) {
					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
						}
						doSetRollbackOnly(status);
					}
					else {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
						}
					}
				}
				else {
					logger.debug("Should roll back transaction but cannot - no transaction available");
				}
				// Unexpected rollback only matters here if we're asked to fail early
				if (!isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = false;
				}
			}
		}
		catch (RuntimeException | Error ex) {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			throw ex;
		}

		triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

		// Raise UnexpectedRollbackException if we had a global rollback-only marker
		if (unexpectedRollback) {
			throw new UnexpectedRollbackException(
					"Transaction rolled back because it has been marked as rollback-only");
		}
	}
	finally {
		cleanupAfterCompletion(status);
	}
}

流程和提交是一样的,先是判断有没有回滚点,如果有就回到到回滚点并清除该回滚点;如果没有则判断是不是新事务(PROPAGATION_REQUIRED属性下的最外层事务和PROPAGATION_REQUIRES_NEW属性下的事务),满足则直接回滚当前事务。回滚完成后同样需要清除掉当前的事务状态并恢复挂起的连接。另外需要特别注意的是在catch里面调用完回滚逻辑后,还通过throw抛出了异常,这意味着什么?意味着即使是嵌套事务,内层事务的回滚也会导致外层事务的回滚,也就是addA的事务也会跟着回滚。 至此,事务的传播原理分析完毕,深入看每个方法的实现是很复杂的,但如果仅仅是分析各个传播属性对事务的影响,则有一个简单的方法。我们可以将内层事务切面等效替换掉invocation.proceedWithInvocation方法,比如上面两个类的调用可以看作是下面这样:

// addA的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// addB的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
//事务提交
commitTransactionAfterReturning(txInfo);
}
catch (Throwable ex) {
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
//事务提交
commitTransactionAfterReturning(txInfo);


这样看是不是很容易就能分析出事务之间的影响以及是提交还是回滚了?下面来看几个实例分析。

## 实例分析

我再添加一个C类,和addC的方法,然后在addA里面调用这个方法。

TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// addB的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
b.addB();
}
catch (Throwable ex) {
// target invocation exception
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
//事务提交
commitTransactionAfterReturning(txInfo);

// addC的事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
	c.addC();
}
catch (Throwable ex) {
	// target invocation exception
	//事务回滚
	completeTransactionAfterThrowing(txInfo, ex);
	throw ex;
}
//事务提交

最后

如何获取免费架构学习资料?

资料获取方式:点击下方蓝色传送门

Java学习、面试;文档、视频资源免费获取

由于篇幅原因,就不多做展示了
//事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
//事务提交

最后

如何获取免费架构学习资料?

资料获取方式:点击下方蓝色传送门

Java学习、面试;文档、视频资源免费获取

[外链图片转存中…(img-tpRYDl8b-1628224786798)]

[外链图片转存中…(img-mwmxt6z3-1628224786800)]

[外链图片转存中…(img-5BbcQ95x-1628224786802)]

[外链图片转存中…(img-fmfyqsvP-1628224786804)]

[外链图片转存中…(img-mslD6ntX-1628224786806)]

[外链图片转存中…(img-cc8BVOTF-1628224786807)]

由于篇幅原因,就不多做展示了

以上是关于Nacos服务心跳和健康检查源码介绍,面试官:问你一个的主要内容,如果未能解决你的问题,请参考以下文章

Nacos源码之服务端AP架构集群节点的心跳检测

Nacos源码分析专题-服务心跳

Nacos源码分析专题-服务心跳

Nacos源码分析专题-服务心跳

Nacos源码系列——第二章(Nacos核心源码主线剖析下)

Nacos源码-小结