强一致性分布式事务XA 浅析
Posted java晴天过后
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了强一致性分布式事务XA 浅析相关的知识,希望对你有一定的参考价值。
一、前言
分布式事务:分布式条件下,多个节点操作的整体事务一致性。
特别是在微服务场景下,业务 A 和业务 B 关联,事务 A 成功,事务 B 失败,由于跨系统, 就会导致不被感知。 此时从整体来看,数据是不一致的。
分布式事务中的两大基本理论:CAP
理论 与 Base
理论。
分布式事务解决方案可以分为:
- 强一致性分布式事务解决方案:基于
CAP
理论 - 最终一致性分布式事务解决方案:基于
Base
理论
强一致性分布式解决方案
在强一致性事务解决方案中,典型的方案包括:
DTP
模型(全局事务模型):基于DTP
模型,典型的解决方案是分布式通信协议XA
规范2PC
模型(二阶段提交模型)3PC
模型(三阶段提交模型)
(1)DTP
模型
DTP
模型是X/Open
组织定义的一套分布式事务标准,这套标准主要定义了实现分布式事务的规范和API
。
DTP
模型的重要概念:
- 事务:一个事务就是一个完整的工作单元,具备
ACID
特性。 - 全局事务:由事务管理器管理的事务,能够一次性操作多个资源管理器。
- 分支事务:由事务管理器管理的全局事务中,每个资源管理器中独立执行的事务。
- 控制线程:执行全局事务的线程,这个线程用来关联应用程序、事务管理器和资源管理器三者之间的关系,
在 DTP
模型中,定义了 3个核心组件:
- 应用程序(
AP
):参与DTP
分布式事务模型的应用程序。 - 事务管理器(
TM
):负责协调和管理DTP
模型中的事务,为应用程序提供编程接口,同时管理资源管理器。 - 资源管理器(
RM
):数据库管理系统或消息服务管理器。
(2)2PC
模型
两阶段提交(Two-phase Commit
, 2PC
)算法,经常用来实现分布式事务。
2PC
模型两阶段执行流程:
Prepare
准备阶段:在本地执行相应的事务,但事务并没有提交。Commit
提交阶段:发送 回滚消息 或者 提交消息。
2PC
模型存在的问题:
- 同步阻塞问题:事务的执行过程中,所有参与事务的节点都会对其占用的公共资源加锁,导致其他访问公共资源的进程或者线程阻塞。
- 单点故障问题:如果事务管理器发生故障,则资源管理器会一直阻塞。
- 数据不一致问题:如果在
Commit
阶段,由于网络或者部分资源管理器发生故障,导致部分资源管理器没有接收到事务管理器发送过来的Commit
消息,会引起数据不一致的问题。 - 无法解决的问题:如果在
Commit
阶段,事务管理器发出Commit
消息后宕机,并且唯一接收到这条Commit
消息的资源管理器也宕机了,则无法确认事务是否已经提交。
(3)3PC
模型
3PC
模型是指三阶段提交模型,是在2PC
模型的基础上改进的版本。
3PC
模型把 2PC
模型中的 Prepare
阶段一分为二,形成 3个阶段:
CanCommit
阶段:询问是否能够执行事务。PreCommit
阶段:执行事务操作。doCommit
/doRollback
阶段:正式提交事务。
3PC
模型主要解决了 单点故障问题,并减少了事务执行过程中产生的阻塞现象。
二、XA
强一致性分布式事务原理
XA
规范:
-
xa_start
: 负责开启或者恢复一个事务分支,并且管理XID
到调用线程。 -
xa_end
: 负责取消当前线程与事务分支的关联。 -
xa_prepare
: 询问RM
是否准备好提交事务分支。 -
—————— 第一阶段提交 —————————
如果是单机,可以直接跳过
prepare
和第二阶段,输入one phase commit 事务id
直接进行提交即可。 -
xa_commit
: 通知RM
提交事务分支。 -
xa_rollback
: 通知RM
回滚事务分支。 -
xa_recover
: 需要恢复的XA
事务。 -
—————— 第二阶段提交 —————————
XA
二阶段提交:
- 一阶段:执行
XA PREPARE
语句。 - 二阶段:执行
XA COMMIT/ROLLBACK
语句。
XA
协议存在的问题
-
同步阻塞问题:一般情况下,不需要调高隔离级别,
XA
默认不会改变隔离级别全局事务内部包含了多个独立的事务分支,这一组事务分支要不都成功,要不都失败。各个事务分支的 ACID 特性共同构成了全局事务的 ACID 特性。也就是将单个事务分支的支持的 ACID 特性提升一个层次(up a level)到分布式事务的范畴。即使在非分布事务中(即本地事务),如果对操作读很敏感,我们也需要将事务隔离级别设置为 SERIALIZABLE,而对于分布式事务来说,更是如此,可重复读隔离级别不足以保证分布式事务一致性。也就是说,如果我们使用 mysql 来支持 XA 分布式事务的话,那么最好将事务隔离级别设置为 SERIALIZABLE,地球人都知道 SERIALIZABLE(串行化)是四个事务隔离级别中最高的一个级别,也是执行效率最低的一个级别
-
单点故障:成熟的
XA
框架需要考虑TM
的高可用性由于协调者的重要性,一旦协调者 TM 发生故障,参与者 RM 会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)
-
数据不一致:极端情况下,一定有事务失败问题,需要监控和人工处理
在二阶段提交的阶段二中,当协调者向参与者发送 commit 请求之后,发生了局部网络异常或者在发送 commit 请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了 commit 请求。而在这部分参与者接到 commit 请求之后就会执行 commit 操作。但是其他部分未接到 commit 请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。
解决 XA
存在的问题
解决 XA
数据不一致方案:
- 日志存储:记录
XA
事务在每个流程中的执行状态。 - 自定义事务恢复:通过
XA recovery
命令从资源管理器中获取需要被恢复的事务记录,然后根据XID
匹配应用程序中存储的日志,根据事务状态进行提交或回滚。
解决事务管理器的单点故障方案:
- 去中心化部署:事务管理器嵌套在应用程序里面,不再单独部署。
- 中心化部署:事务管理器单独部署,然后与应用程序进行远程通信。
(1)MySQL
对 XA
规范的支持
MySQL
从5.0.3
开始支持InnoDB
引擎的XA
分布式事务,MySQL Connector/J
从5.0.0
版本开始支持XA
。
MySQL XA
事务状态是正确执行 XA
事务的关键:
每次执行
MySQL
的XA
事务语句都会修改XA
事务的状态,进而执行不同的XA
语句。
完整的 XA
事务处理过程:
单个 MySQL
的内部操作:
MySQL XA
的问题
MySQL <5.7
版本会出现的问题:
-
已经
prepare
(预提交)的事务,在客户端退出或者服务宕机的时候,二阶段提交 的事务会被回滚。 -
在服务器故障重启提交后,相应的
Binlog
被丢失
MySQL 5.6 版本在客户端退出的时候,自动把已经 prepare 的事务回滚了,那么 MySQL 为什么要这样做?
这主要取决于 MysQL 的内部实现,MySQL 5.7 以前的版本,对于 prepare 的事务,MySQL 是不会记录 binlog 的(官方说是减少 fsync,起到了优化的作用)。只有当分布式事务提交的时候才会把前面的操作写入 binloq 信息,所以对于 binloq 来说,分布式事务与普通的事务没有区别,而 prepare 以前的操作信息都保存在连接的 IO CACHE 中,如果这个时候客户端退出了,以前的 binloq 信息都会被丢失,再次重连后允许提交的话,会造成 Binloq 丢失,从而造成主从数据的不一致,所以官方在客户端退出的时候直接把已经 prepare 的事务都回滚了!
MySQL >5.7
版本的优化:
MySQL
对于分布式事务,在 prepare 的时候就完成了写 Binlog 的操作,通过新增一种叫 XA-preparelog-event 的 event 类型来实现,这是与以前版本的主要区别(以前版本 prepare 时不写 Binlog)
(2)手动通过 JDBC
操作 MySQL XA
事务
MySQL Connector/J
从 5.0.0 版本之后开始直接提供对XA
的支持,也就是提供了Java
版本XA
接口的实现。 意味着可以直接通过Java
代码来执行MySQL XA
事务。
模拟下订单减库存:
- 下订单:创建订单
- 扣库存:更新库存数量
-- 数据库如下
CREATE DATABASE tx_msg_order;
CREATE TABLE `order` (
`id` bigint(20) NOT NULL COMMENT '主键',
`create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
`order_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT '订单编号',
`product_id` bigint(20) NULL DEFAULT 0 COMMENT '商品id',
`pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '模拟订单' ROW_FORMAT = Dynamic;
CREATE DATABASE tx_msg_stock;
CREATE TABLE `stock` (
`id` bigint(11) NOT NULL COMMENT '主键id',
`product_id` bigint(20) NULL DEFAULT 0 COMMENT '商品id',
`total_count` int(11) NULL DEFAULT 0 COMMENT '商品总库存',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '模拟库存' ROW_FORMAT = Dynamic;
代码如下:
public class Test
public static void main(String[] args) throws SQLException
// 创建订单库 RM实例
Connection orderConnection = DriverManager.getConnection(
"jdbc:mysql://127.0.0.1:3306/tx_msg_order?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false",
"test", "test");
// 这里的这个true参数,是说打印出来XA分布式事务的一些日志
XAConnection orderXAConnection = new MysqlXAConnection(
(com.mysql.jdbc.Connection)orderConnection, true);
// 这个XAResource其实你可以认为是RM(Resource Manager)的一个代码中的对象实例
XAResource orderResource = orderXAConnection.getXAResource();
// 创建库存库 的RM实例
Connection stockConnection = DriverManager.getConnection(
"jdbc:mysql://127.0.0.1:3306/tx_msg_stock?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false",
"test", "test");
XAConnection stockXAConnection = new MysqlXAConnection(
(com.mysql.jdbc.Connection)stockConnection, true);
XAResource stockResource = stockXAConnection.getXAResource();
// 下面俩东西是分布式事务id(txid)的构成部分
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try
// 这是说在分布式事务中的订单库的子事务的标识
// 我们在订单库要执行的操作隶属于分布式事务的一个子事务,子事务有自己的一个标识
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId); // 这个xid代表了订单库中的子事务
// 这就是说通过START和END两个操作,定义好了分布式事务中,订单库中要执行的SQL语句
// 但是这里的SQL绝对不会执行的,只是说先定义好我要在分布式事务中,这个数据库里要执行哪些SQL语句
orderResource.start(xid1, XAResource.TMNOFLAGS);
PreparedStatement orderPreparedStatement = orderConnection.prepareStatement(
"INSERT INTO `order` (id, create_time, order_no, product_id, pay_count) " +
"VALUES (1, NOW(), 1, 1, 1)");
orderPreparedStatement.execute();
orderResource.end(xid1, XAResource.TMSUCCESS);
// 这是说在分布式事务中的库存库的子事务的标识
// 大家看下,库存库的子事务的xid中的,gtrid和formatId是一样的,bqual是不一样的
// 在一个分布式事务中,涉及到多个数据库的子事务,每个子事务的txid,有一部分是一样的,一部分是不一样的
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 这就是说通过START和END两个操作,定义好了分布式事务中,库存库中要执行的SQL语句
stockResource.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement stockPreparedStatement = stockConnection.prepareStatement(
"UPDATE stock SET total_count = total_count - 1 where id = 1");
stockPreparedStatement.execute();
stockResource.end(xid2, XAResource.TMSUCCESS);
// 到这里为止,其实还啥都没干呢,不过就是定义了分布式事务中的两个库要执行的SQL语句罢了
// 2PC的阶段一:向两个库都发送prepare消息,执行事务中的SQL语句,但是不提交
int orderPrepareResult = orderResource.prepare(xid1);
int stockPrepareResult = stockResource.prepare(xid2);
// 2PC的阶段二:两个库都发送commit消息,提交事务
// 如果两个库对prepare都返回ok,那么就全部commit,对每个库都发送commit消息,完成自己本地事务的提交
if (orderPrepareResult == XAResource.XA_OK
&& stockPrepareResult == XAResource.XA_OK)
orderResource.commit(xid1, false);
stockResource.commit(xid2, false);
else
// 如果如果不是所有库都对prepare返回ok,那么就全部rollback
orderResource.rollback(xid1);
stockResource.rollback(xid2);
catch (XAException e)
e.printStackTrace();
日志输出:
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303031,0x1
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303031,0x1
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303032,0x1
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303032,0x1
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303031,0x1
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303032,0x1
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303031,0x1
Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303032,0x1
查看数据库:
mysql> use tx_msg_order;
mysql> select * from `order`;
+---------------+---------------------+---------------+------------+-----------+
| id | create_time | order_no | product_id | pay_count |
+---------------+---------------------+---------------+------------+-----------+
| 1 | 2022-07-07 06:41:56 | 1 | 1 | 1 |
+---------------+---------------------+---------------+------------+-----------+
mysql> use tx_msg_stock;
mysql> select * from stock;
+----+------------+-------------+
| id | product_id | total_count |
+----+------------+-------------+
| 1 | 1001 | 9999 |
| 2 | 1 | 9995 |
+----+------------+-------------+
2 rows in set (0.00 sec)
(3)JTA
事务
JTA
(Java Transaction API
):为 J2EE
平台提供了分布式事务服务的能力。
JTA
规范是XA
规范的Java
版,即把XA
规范中规定的DTP
模型交互接口抽象成Java
接口中的方法,并规定每个方法要实现什么样的功能。
JTA
定义的主要接口,位于 javax.transaction
包中:
Transaction
接口:让应用程序得以控制事务的开始、挂起、提交、回滚等。由Java
客户端程序或EJB
调用。TransactionManager
接口:用于应用服务器管理事务状态Transaction
接口:用于执行相关事务操作XAResource
接口:用于在分布式事务环境下,协调事务管理器和资源管理器的工作Xid
接口:为事务标识符的Java
映射
采用 JTA
+ Atomikos
分布式事务框架:底层思想也是 2PC
原理
JTA
:主要提供了事务管理器,即分布式事务流程管控的机制Atomikos
框架:提供了分布式事务的DataSource
数据源的支持
浅析 Atomikos
源码框架:
实战:模拟下订单减库存
同样以上文的模拟下订单减库存为例:
技术栈:SpringBoot
+ Atomikos
+ JDBCTemplate
- 添加对应依赖:
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
复制代码
application.properties
配置
spring.jta.enabled=true
spring.jta.atomikos.datasource.primary.xa-properties.url=jdbc:mysql://localhost:3306/tx_msg_order?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false
spring.jta.atomikos.datasource.primary.xa-properties.user=test
spring.jta.atomikos.datasource.primary.xa-properties.password=test
spring.jta.atomikos.datasource.primary.xa-data-source-class-name=com.mysql.cj.jdbc.MysqlXADataSource
spring.jta.atomikos.datasource.primary.unique-resource-name=tx_msg_order
spring.jta.atomikos.datasource.primary.max-pool-size=25
spring.jta.atomikos.datasource.primary.min-pool-size=3
spring.jta.atomikos.datasource.primary.max-lifetime=20000
spring.jta.atomikos.datasource.primary.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.secondary.xa-properties.url=jdbc:mysql://localhost:3306/tx_msg_stock?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false
spring.jta.atomikos.datasource.secondary.xa-properties.user=test
spring.jta.atomikos.datasource.secondary.xa-properties.password=test
spring.jta.atomikos.datasource.secondary.xa-data-source-class-name=com.mysql.cj.jdbc.MysqlXADataSource
spring.jta.atomikos.datasource.secondary.unique-resource-name=tx_msg_stock
spring.jta.atomikos.datasource.secondary.max-pool-size=25
spring.jta.atomikos.datasource.secondary.min-pool-size=3
spring.jta.atomikos.datasource.secondary.max-lifetime=20000
spring.jta.atomikos.datasource.secondary.borrow-connection-timeout=10000
- 多数据源配置
@Configuration
public class DataSourceConfiguration
@Primary
@Bean
@ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.primary")
public DataSource primaryDataSource()
return new AtomikosDataSourceBean();
@Bean
@ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.secondary")
public DataSource secondaryDataSource()
return new AtomikosDataSourceBean();
@Bean
public JdbcTemplate orderJdbcTemplate(@Qualifier("primaryDataSource") DataSource primaryDataSource)
return new JdbcTemplate(primaryDataSource);
@Bean
public JdbcTemplate stockJdbcTemplate(@Qualifier("secondaryDataSource") DataSource secondaryDataSource)
return new JdbcTemplate(secondaryDataSource);
- 业务代码
@Service
@RequiredArgsConstructor
public class OrderService
private final JdbcTemplate orderJdbcTemplate;
private final JdbcTemplate stockJdbcTemplate;
@Transactional(rollbackFor = Exception.class)
public void createOrder()
orderJdbcTemplate.update("INSERT INTO `order` (id, create_time, order_no, product_id, pay_count) " + " VALUES (2, NOW(), 2, 1, 1)");
stockJdbcTemplate.update("UPDATE stock SET total_count = total_count - 1 where id = ?", 1);
- 测试:调用
createOrder()
# 程序日志输出:开启 logging.level.root=DEBUG
复制代码
生成日志 tx
:transaction-logs
目录下
"id":"127.0.0.1.tm165739505160000001","wasCommitted":true,"participants":["uri":"127.0.0.1.tm1","state":"COMMITTING","expires":1657395061620,"resourceName":"tx_msg_order","uri":"127.0.0.1.tm2","state":"COMMITTING","expires":1657395061620,"resourceName":"tx_msg_stock"]
"id":"127.0.0.1.tm165739505160000001","wasCommitted":true,"participants":["uri":"127.0.0.1.tm1","state":"TERMINATED","expires":1657395061641,"resourceName":"tx_msg_order","uri":"127.0.0.1.tm2","state":"TERMINATED","expires":1657395061641,"resourceName":"tx_msg_stock"]
查看数据库:
mysql> select * from `order`;
+---------------+---------------------+---------------+------------+-----------+
| id | create_time | order_no | product_id | pay_count |
+---------------+---------------------+---------------+------------+-----------+
| 1 | 2022-07-07 06:41:56 | 1 | 1 | 1 |
| 2 | 2022-07-09 19:30:51 | 2 | 1 | 1 |
+---------------+---------------------+---------------+------------+-----------+
浅析innodb_support_xa与innodb_flush_log_at_trx_commit
浅析innodb_support_xa与innodb_flush_log_at_trx_commit
以上是关于强一致性分布式事务XA 浅析的主要内容,如果未能解决你的问题,请参考以下文章