13. sjdbc之最大努力型分布式事务
Posted 阿飞的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13. sjdbc之最大努力型分布式事务相关的知识,希望对你有一定的参考价值。
关于sharding-jdbc分布式事务:
Best efforts delivery transaction (已经实现).
Try confirm cancel transaction (待定).
Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务。
最大努力送达型事务说明
Best efforts delivery transaction就是最大努力送达型事务。在分布式数据库的场景下,相信对于该数据库的操作最终一定可以成功,所以通过最大努力送达,反复尝试。
很少有公司分布式场景下用强一致性事务。就笔者和陆金所朋友沟通,他们保证数据一致性的核心是T+1对账。再比如带有NFC功能的手机可以给公交卡充值的场景,笔者做过试验,在微信支付扣费后,将公交卡挪开,这时候会导致充值失败,但是资金已经扣除。微信支付的做法是T+1对账后将资金返回给用户,微信支付并不会在充值失败后一段时间内马上将资金返回给用户。
最大努力送达型事务架构图
摘自sharding-jdbc使用指南☞事务支持:http://shardingjdbc.io/1.x/docs/02-guide/transaction/
解读这张架构图,对几个重要的执行过程进行更详细的说明:
执行前。执行前事件->记录事务日志。sharding-jdbc对于任何执行,都会先记录事务日志。
执行成功。执行结果事件->监听执行事件->执行成功->清理事务日志。如果执行成功,就会清理事务日志。
执行失败,同步重试成功。执行结果事件->监听执行事件->执行失败->重试执行->执行成功->清理事务日志。
执行失败,同步重试失败,异步重试成功。执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行成功->清理事务日志
执行失败,同步重试失败,异步重试失败,事务日志保留----如图所示,执行结果事件->监听执行事件->执行失败->重试执行->执行失败->"异步送达作业"重试执行->执行失败->… …
说明:sharding-jdbc在执行前都会通过执行前事件来记录事务日志;执行事件类型包括3种:
BEFORE_EXECUTE;
EXECUTE_FAILURE;
EXECUTE_SUCCESS;
同步
另外,这里的同步不是绝对的同步执行,而是通过google-guava的EventBus订阅执行事件,在监听端判断是EXECUTE_FAILURE事件,然后最多重试syncMaxDeliveryTryTimes
次。后面对BestEffortsDeliveryListener
的源码分析有介绍;
异步
这里的异步通过外挂程序实现,本质就是一个基于elastic-job的分布式JOB任务,在下一篇文章会有分析;
使用限制
使用最大努力送达型柔性事务的SQL需要满足幂等性。
INSERT语句要求必须包含主键,且不能是自增主键。
UPDATE语句要求幂等,不能是UPDATE table SET x=x+1。
DELETE语句无要求。
开发示例
// 1. 配置SoftTransactionConfiguration
SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);
// 配置相关请看后面的备注
transactionConfig.setXXX();
// 2. 初始化SoftTransactionManager
SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);
transactionManager.init();
// 3. 获取BEDSoftTransaction
BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);
// 4. 开启事务
transaction.begin(connection);
// 5. 执行JDBC
// code here
// 6.关闭事务
transaction.end();
备注:SoftTransactionConfiguration支持的配置以及含义请参考官方文档sharding-jdbc使用指南☞事务支持:http://shardingjdbc.io/docs/02-guide/transaction/,这段开发示例的代码也摘自官方文档;也可参考sharding-jdbc-transaction
模块中io.shardingjdbc.transaction.integrate.SoftTransactionTest
如何使用柔性事务,但是这里的代码需要稍作修改,否则只是普通的执行逻辑,不是sharding-jdbc的执行逻辑:
@Test
public void bedSoftTransactionTest() throws SQLException {
SoftTransactionManager transactionManagerFactory = new SoftTransactionManager(getSoftTransactionConfiguration(getShardingDataSource()));
// 初始化柔性事务管理器
transactionManagerFactory.init();
BEDSoftTransaction transactionManager = (BEDSoftTransaction) transactionManagerFactory.getTransaction(SoftTransactionType.BestEffortsDelivery);
transactionManager.begin(getShardingDataSource().getConnection());
// 执行INSERT SQL(DML类型),如果执行过程中异常,会在`BestEffortsDeliveryListener`中重试
insert();
transactionManager.end();
}
private void insert() {
String dbSchema = "insert into transaction_test(id, remark) values (2, ?)";
try (
// 将.getConnection("db_trans", SQLType.DML)移除,这样的话,得到的才是ShardingConnection
Connection conn = getShardingDataSource().getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.setString(1, "JUST TEST IT .");
preparedStatement.executeUpdate();
} catch (final SQLException e) {
e.printStackTrace();
}
}
核心源码分析
通过 sjdbc源码之路由&执行 中对ExecutorEngine的分析可知,sharding-jdbc在执行SQL前后,都会调用EventBus.post()
发布执行事件。那么调用EventBusInstance.register()
的地方,就是柔性事务处理的地方。而sharding-jdbc在SoftTransactionManager.init()
中调用了EventBus.register()
初始化注册事件,所以柔性事务实现的核心在SoftTransactionManager这里。
柔性事务管理器
柔性事务实现在SoftTransactionManager
中,核心源码如下:
public final class SoftTransactionManager {
// 柔性事务配置对象
@Getter
private final SoftTransactionConfiguration transactionConfig;
/**
* Initialize B.A.S.E transaction manager.
* @throws SQLException SQL exception
*/
public void init() throws SQLException {
// 初始化注册最大努力送达型柔性事务监听器;
EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
// 如果事务日志数据源类型是关系型数据库,则创建事务日志表transaction_log
createTable();
}
// 内嵌的最大努力送达型异步JOB任务,依赖当当开源的elastic-job
if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
}
}
// 从这里可知创建的事务日志表表名是transaction_log(所以需要保证每个库中用户没有自定义创建transaction_log表)
private void createTable() throws SQLException {
String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("
+ "`id` VARCHAR(40) NOT NULL, "
+ "`transaction_type` VARCHAR(30) NOT NULL, "
+ "`data_source` VARCHAR(255) NOT NULL, "
+ "`sql` TEXT NOT NULL, "
+ "`parameters` TEXT NOT NULL, "
+ "`creation_time` LONG NOT NULL, "
+ "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "
+ "PRIMARY KEY (`id`));";
try (
Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.executeUpdate();
}
}
从这段源码可知,柔性事务的几个重点如下,接下来一一根据源码进行分析;
事务日志存储器;
最大努力送达型事务监听器;
异步送达JOB任务;
1.事务日志存储器
柔性事务日志接口类为TransactionLogStorage.java
,有两个实现类:
RdbTransactionLogStorage:关系型数据库存储柔性事务日志;
MemoryTransactionLogStorage:内存存储柔性事务日志;
1.1事务日志核心接口
TransactionLogStorage中几个重要接口在两个实现类中的实现:
void add(TransactionLog):Rdb实现就是把事务日志TransactionLog 插入到
transaction_log
表中,Memory实现就是把事务日志保存到ConcurrentHashMap
中;void remove(String id):Rdb实现就是从
transaction_log
表中删除事务日志,Memory实现从ConcurrentHashMap
中删除事务日志;void increaseAsyncDeliveryTryTimes(String id):异步增加送达重试次数,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb实现就是update
transaction_log
表中async_delivery_try_times
字段加1;Memory实现就是TransactionLog中重新给asyncDeliveryTryTimes赋值new AtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet()
;findEligibleTransactionLogs(): 查询需要处理的事务日志,条件是:①
异步处理次数async_delivery_try_times小于参数最大处里次数maxDeliveryTryTimes
,②transaction_type是BestEffortsDelivery
,③系统当前时间与事务日志的创建时间差要超过参数maxDeliveryTryDelayMillis
,每次最多查询参数size条;Rdb实现通过sql从transaction_log表中查询,Memory实现遍历ConcurrentHashMap匹配符合条件的TransactionLog;boolean processData():Rdb实现执行TransactionLog中的sql,如果执行过程中抛出异常,那么调用increaseAsyncDeliveryTryTimes()增加送达重试次数并抛出异常,如果执行成功,删除事务日志,并返回true;Memory实现直接返回false(因为processData()的目的是执行TransactionLog中的sql,而Memory类型无法触及数据库,所以返回false)
1.2事务日志RDB存储核心源码
事务日志RDB存储核心源码在RdbTransactionLogStorage.java
中,主要提供了对事务日志表transaction_log
的CRUD接口。
1.3事务日志存储样例
transaction_log中存储的事务日志样例:
1.2最大努力送达型事务监听器
核心源码如下:
@Slf4j
public final class BestEffortsDeliveryListener {
@Subscribe
@AllowConcurrentEvents
// 从方法可知,只监听DML执行事件(DML即数据维护语言,包括INSERT, UPDATE, DELETE)
public void listen(final DMLExecutionEvent event) {
// 判断是否需要继续,判断逻辑为:事务存在,并且是BestEffortsDelivery类型事务
if (!isProcessContinuously()) {
return;
}
// 从柔性事务管理器中得到柔性事务配置
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
// 得到配置的柔性事务存储器
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
// 这里肯定是最大努力送达型事务(如果不是BEDSoftTransaction,isProcessContinuously()就是false)
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
// 根据事件类型做不同处理
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE:
// 如果执行前事件,那么先保存事务日志;
//TODO for batch SQL need split to 2-level records
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
case EXECUTE_SUCCESS:
// 如果执行成功事件,那么删除事务日志;
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE:
boolean deliverySuccess = false;
// 如果执行成功事件,最大努力送达型最多尝试3次(可配置,SoftTransactionConfiguration中的参数syncMaxDeliveryTryTimes);
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
// 如果在该Listener中执行成功,那么返回,不需要再尝试
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
// 通过执行"select 1"判断conn是否是有效的数据库连接;如果不是有效的数据库连接,释放掉并重新获取一个数据库连接;
if (!isValidConnection(conn)) {
bedSoftTransaction.getConnection().release(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
//TODO for batch event need split to 2-level records
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
// 因为只监控DML,所以调用executeUpdate()
preparedStatement.executeUpdate();
// executeUpdate()后能执行到这里,说明执行成功;根据id删除事务日志;
deliverySuccess = true;
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
// 如果sql执行有异常,那么输出error日志
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
// 值支持三种事件类型,对于其他值,抛出异常
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
}
}
}
BestEffortsDeliveryListener源码总结:
执行前,插入事务日志;
执行成功,则删除事务日志;
执行失败,则最大努力尝试
syncMaxDeliveryTryTimes
次;
1.3 异步送达JOB任务
同步重试若干次后,如果依然没有执行成功,可以通过部署的异步送达JOB任务继续重试,该特性将在下一篇文章详细讲解;
以上是关于13. sjdbc之最大努力型分布式事务的主要内容,如果未能解决你的问题,请参考以下文章