Seata架构篇 - AT模式
Posted 等後那场雪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Seata架构篇 - AT模式相关的知识,希望对你有一定的参考价值。
前言
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
X/Open DTP Model
X/Open DTP Model 是 X/Open 组织定义的一套分布式事务的标准,也就是定义了分布式事务处理的规范和 API 接口,但是具体实现由各厂商自行实现。
TM、RM 作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。
TC(Transaction Coordinator)- 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM(Transaction Manager)- 事务管理器
管理全局事务,包括开始全局事务、提交或回滚全局事务。
RM(Resource Manager)- 资源管理器
管理分支事务,包括向 TC 注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
在 Seata 中,分布式事务的执行流程:
- TM 开启分布式事务(TM 向 TC 注册全局事务记录)
- 按业务场景,编排数据库、服务等事务资源(RM 向 TC 汇报资源准备状态)
- TM 结束分布式事务,事务阶段一结束(TM 通知 TC 提交/回滚分布式事务)
- TC 汇总事务信息,决定分布式事务提交还是回滚
- TC 通知所有 RM 提交/回滚,事务二阶段结束
AT 模式
概述
Seata AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。
为什么要检查全局锁呢,这是由于 Seata AT 模式的事务隔离是建立在分支事务的本地隔离级别基础之上的,在数据库本地隔离级别读已提交或以上的前提下,Seata 设计了由事务协调器维护的全局写排他锁,来保证事务间的写隔离,同时,将全局事务默认定义在读未提交的隔离级别上。
Seata 的事务是一个全局事务,它包含了若干个分支本地事务,在全局事务执行过程中(全局事务还没执行完),某个本地事务提交了,如果 Seata 没有采取任何措施,则会导致已提交的本地事务被读取,造成脏读,如果数据在全局事务提交前已提交的本地事务被修改,则会造成脏写。
一阶段:业务数据和回滚日志在同一个本地事务提交,释放本地锁和连接资源。
二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志反向补偿。
适用场景
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
原理
一阶段
1、解析 SQL。得到 SQL 的类型(UPDATE)、表(product)、条件(where name = ‘TXC’)等相关的信息。
2、查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC';
得到前镜像:
id | name | since |
---|---|---|
1 | TXC | 2014 |
3、执行业务 SQL。更新这条记录的 name 为 ‘GTS’。
4、查询后镜像:根据前镜像的结果,通过 主键 定位数据。
得到后镜像:
id | name | since |
---|---|---|
1 | GTS | 2014 |
5、插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG
表中。
"branchId": 641789253,
"undoItems": [
"afterImage":
"rows": [
"fields": [
"name": "id",
"type": 4,
"value": 1
,
"name": "name",
"type": 12,
"value": "GTS"
,
"name": "since",
"type": 12,
"value": "2014"
]
],
"tableName": "product"
,
"beforeImage":
"rows": [
"fields": [
"name": "id",
"type": 4,
"value": 1
,
"name": "name",
"type": 12,
"value": "TXC"
,
"name": "since",
"type": 12,
"value": "2014"
]
],
"tableName": "product"
,
"sqlType": "UPDATE"
],
"xid": "xid:xxx"
6、提交前,向 TC 注册分支:申请 product
表中,主键值等于 1 的记录的 全局锁 。
7、本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
8、将本地事务提交的结果上报给 TC。
二阶段
回滚
1、收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
2、通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
3、数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
4、根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
5、提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
提交
1、收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
2、异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
回滚日志表如下:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
写隔离
一阶段本地事务提交之前,需要确保先拿到全局锁,这样才能提交本地事务。
尝试获取全局锁如果超时,则放弃尝试,然后回滚本地事务并释放本地锁。
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
使用
假设 Bussiness 服务调用 Stock 服务和 Order 服务。
1、配置 Bussiness 服务。
-
配置 GlobalTransactionScanner 的applicationId 和 txServiceGroup(事务分组)。
-
使用 @GlobalTransactional 注解
-
可以使用 RootContext.getXID() 方法获取到 XID
-
按需修改 file.conf、registry.conf 文件的内容
@Configuration
public class SeataAutoConfig
@Bean
public GlobalTransactionScanner globalTransactionScanner()
// 指定 applicationId、txServiceGroup
return new GlobalTransactionScanner("dubbo-gts-seata-example", "my_test_tx_group");
@Service
public class BusinessServiceImpl implements BusinessService
@Reference(version = "1.0.0")
private StockDubboService stockDubboService;
@Reference(version = "1.0.0")
private OrderDubboService orderDubboService;
private boolean flag;
@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example")
public ObjectResponse handleBusiness(BusinessDTO businessDTO)
System.out.println("开始全局事务,XID = " + RootContext.getXID());
ObjectResponse<Object> objectResponse = new ObjectResponse<>();
//1、扣减库存
CommodityDTO commodityDTO = new CommodityDTO();
commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
commodityDTO.setCount(businessDTO.getCount());
ObjectResponse stockResponse = stockDubboService.decreaseStock(commodityDTO);
//2、创建订单
OrderDTO orderDTO = new OrderDTO();
orderDTO.setUserId(businessDTO.getUserId());
orderDTO.setCommodityCode(businessDTO.getCommodityCode());
orderDTO.setOrderCount(businessDTO.getCount());
orderDTO.setOrderAmount(businessDTO.getAmount());
ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO);
//打开注释测试事务发生异常后,全局回滚功能
// if (!flag)
// throw new RuntimeException("测试抛异常后,分布式事务回滚!");
//
if (stockResponse.getStatus() != 200 || response.getStatus() != 200)
throw new DefaultException(RspStatusEnum.FAIL);
objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode());
objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage());
objectResponse.setData(response.getData());
return objectResponse;
2、 配置 Stock 服务。
- 配置 DataSource、DataSourceProxy、SqlSessionFactory、GlobalTransactionScanner
- 可以使用 RootContext.getXID() 方法获取到 XID
- 按需修改 file.conf、registry.conf 文件的内容
- 对应数据库配置 UNDO_LOG 表
@Configuration
public class SeataAutoConfig
@Autowired
private DataSourceProperties dataSourceProperties;
@Bean
@Primary
public DruidDataSource druidDataSource()
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
return druidDataSource;
@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource)
return new DataSourceProxy(druidDataSource);
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
factoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper/*.xml"));
factoryBean.setTransactionFactory(new JdbcTransactionFactory());
return factoryBean.getObject();
@Bean
public GlobalTransactionScanner globalTransactionScanner()
return new GlobalTransactionScanner("stock-gts-seata-example", "my_test_tx_group");
3、配置 Order 服务
- 配置 DataSource、DataSourceProxy、SqlSessionFactory、GlobalTransactionScanner
- 可以使用 RootContext.getXID() 方法获取到 XID
- 按需修改 file.conf、registry.conf 文件的内容
- 对应数据库配置 UNDO_LOG 表
@Configuration
public class SeataAutoConfig
@Autowired
private DataSourceProperties dataSourceProperties;
@Bean
@Primary
public DruidDataSource druidDataSource()
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
return druidDataSource;
@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource)
return new DataSourceProxy(druidDataSource);
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSourceProxy);
factoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper/*.xml"));
factoryBean.setTransactionFactory(new JdbcTransactionFactory());
return factoryBean.getObject();
@Bean
public GlobalTransactionScanner globalTransactionScanner()
return new GlobalTransactionScanner("order-gts-seata-example", "my_test_tx_group");
微服务架构 | 11.1 整合 Seata AT 模式实现分布式事务 #yyds干货盘点#
@[TOC](11.1 整合 Seata AT 模式实现分布式事务)
前言
参考资料:
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
《Seata 中文官网》
《Seata GitHub 官网》
《Seata 官方示例》
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务;它提供了 AT、TCC、Saga 和 XA 事务模式,为开发者提供了一站式的分布式事务解决方案;
1. Seata 基础知识
1.1 Seata 的 AT 模式
- Seata 的 AT 模式基于 1 个全局 ID 和 3 个组件模型:
- Transaction ID XID:全局唯一的事务 ID;
- Transaction Coordinator TC:事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;
- Transaction Manager TM:控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;
- Resource Manager RM:控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚;
- 为方便理解这里称 TC 为服务端;
- 使用 AT 模式时有一个前提,RM 必须是支持本地事务的关系型数据库;
1.2 Seata AT 模式的工作流程
- TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的
XID
; - XID 在微服务调用链路的上下文中传播;
- RM 向 TC 注册分支事务,将其纳入
XID
对应全局事务的管辖; - TM 向 TC 发起针对
XID
的全局提交或回滚决议; - TC 调度
XID
下管辖的全部分支事务完成提交或回滚请求;
1.3 Seata 服务端的存储模式
- Seata 服务端的存储模式有三种:file、db 和 redis:
- file:默认,单机模式,全局事务会话信息持久化在本地文件
$SEATA_HOME\\bin\\sessionStore\\root.data
中,性能较高(file 类型不支持注册中心的动态发现和动态配置功能); - db:需要修改配置,高可用模式,Seata 全局事务会话信息由全局事务、分支事务、全局锁构成,对应表:
globaltable
、branchtable
、lock_table
; - redis:需要修改配置,高可用模式;
- file:默认,单机模式,全局事务会话信息持久化在本地文件
1.4 Seata 与 Spring Cloud 整合说明
- 由于 Spring Cloud 并没有提供分布式事务处理的标准,所以它不像配置中心那样插拔式地集成各种主流的解决方案;
- Spring Cloud Alibaba Seata 本质上还是基于 Spring Boot 自动装配来集成的,在没有提供标准化配置的情况下只能根据不同的分布式事务框架进行配置和整合;
1.5 关于事务分组的说明
- 在 Seata Clien 端的 file.conf 配置中有一个属性
vgroup_mapping
,它表示事务分组映射,是 Seata 的资源逻辑,类似于服务实例,它的主要作用是根据分组来获取 Seata Serve r的服务实例; - 服务分组的工作机制:
- 首先,在应用程序中需要配置事务分组,也就是使用 GlobalTransactionScanner 构造方法中的
txServiceGroup
参数,这个参数有如下几种赋值方式: - 默认情况下,为
$spring.application.name-seata-service-group
; - 在 Spring Cloud Alibaba Seata 中,可以使用
spring cloudalibaba.seata.tx-service-group
赋值; - 在 Seata-Spring-Boot-Starter 中,可以使用
seata.tx-service-group
赋值; - 然后,Seata 客户端会根据应用程序的
txServiceGroup
去指定位置(file.conf 或者远程配置中心)查找service.vgroup_mapping.$txServiceGroup
对应的配置值,该值代表TC集群(Seata Server)的名称; - 最后,程序会根据集群名称去配置中心或者 file.conf 中获得对应的服务列表,也就是
clusterName.grouplist
;
- 首先,在应用程序中需要配置事务分组,也就是使用 GlobalTransactionScanner 构造方法中的
- 在客户端获取服务器地址并没有直接采用服务名称,而是增加了一层事务分组映射到集群的配置。这样做的好处在于,事务分组可以作为资源的逻辑隔离单位,当某个集群出现故障时,可以把故障缩减到服务级别,实现快速故障转移,只需要切换对应的分组即可;
2. Seata 服务端的安装
2.1 安装包安装 Seata
2.1.1 下载 Seata
- 进入 Seata 官网下载 binary 二进制文件安装包(也可以在官方 GitHub 仓库里下):http://seata.io/zh-cn/blog/download.html;
2.1.2 修改存储模式为 db
- 修改存储模式:
- 修改
$SEATA_HOME\\conf\\file.conf
文件,store.mode="db"。如下图所示:
- 修改
- 修改 MySQL 连接信息:
- 修改
$SEATA_HOME\\conf\\file.conf
文件里的 db 模块为自己需要连接的 MySQL 地址;
- 修改
- 在 MySQL 上新建数据库和表;
- SQL 建表语句如下:
- 该 SQL 文件在源码包里的
$SEATA_HOME\\script/server/db/mysql.sql
文件;
-- 判断数据库存在,存在再删除
DROP DATABASE IF EXISTS seata;
-- 创建数据库,判断不存在,再创建
CREATE DATABASE IF NOT EXISTS seata;
-- 使用数据库
USE seata;
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
2.1.3 指明注册中心与配置中心,上传 Seata 配置
- 注册中心:
- 修改
$SEATA_HOME\\conf\\registry.conf
文件里的 registry.type,以及下面的注册中心地址信息;
- 修改
- 配置中心:
- 也是在这个文件里,往下翻,如下图:
- 将 Seata 客户端和服务端的配置信息上传到 Nacos 服务器:
- Seata 客户端和服务端的配置信息保存在
$SEATA_HOME/script/config-center/config.txt
文件里,该文件只在源码包里有,笔者是源码安装 Seata 时做的这步; - 在
$SEATA_HOME\\script\\config-center\\nacos
目录下执行以下nacos-config.sh
脚本即可; - 上传完后可见下图:
- 也是在这个文件里,往下翻,如下图:
2.1.4 启动 Seata 服务器
-
先启动 Nacos,再执行
$SEATA_HOME\\bin\\seata-server.bat
文件; - 启动成功后能在 Nacos 服务器里能看见 Seata 服务;
2.2 源码安装 Seata
2.2.1 拉取代码
- 访问地址:https://github.com/seata/seata;
- 派生后拉取代码;
2.2.2 修改配置文件
- 源码的配置文件在 seata-server 模块下的 resource 资源文件里,有 file.conf 和 registry.conf 文件;
- 跟 2.1 安装包安装一样修改即可;
2.2.3 启动服务
- 先启动 Nacos 服务器;
- 执行
mvm install
将项目安装到本地; - 然后执行 seata-server 模块的
Server.run()
方法即可;
- 同样,在 Nacos 服务器里能看见 Seata 服务;
3. Spring Cloud 集成 Seata 实现分布式事务
- 配置示例参考官方提供的:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md
3.1 引入 pom.xml 依赖文件
- 需要给四个服务都引入以下依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
3.2 修改 bootstrap.yml 配置文件
- Seata 在 1.0 后支持将
$SEATA_HOME/script/client/conf
目录下的两个配置文件 file.conf 和 registry.conf 写进 .yml 格式文件里了(1.0 版本前不支持); - .yml 格式的配置文件在
$SEATA_HOMEscript/client/spring
目录下; -
需要修改
seata.tx-service-group
和seata.service.vgroup-mapping
一致,配置中心、注册中心等; - 另一种配置方法:
- 除此之外,还可以将 file.conf 和 registry.conf 两个文件添加进 resource 目录下;
3.3 注入数据源
- 除此之外,还可以将 file.conf 和 registry.conf 两个文件添加进 resource 目录下;
-
Seata 通过代理数据源的方式实现分支事务;MyBatis 和 JPA 都需要注入
io.seata.rm.datasource.DataSourceProxy
, 不同的是,MyBatis 还需要额外注入org.apache.ibatis.session.SqlSessionFactory
; - MyBatis:
@Configuration
public class DataSourceProxyConfig
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource()
return new DruidDataSource();
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource)
return new DataSourceProxy(dataSource);
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
return sqlSessionFactoryBean.getObject();
3.4 添加 undo_log 表
- 在业务相关的数据库中添加 undo_log 表,用于保存需要回滚的数据;
CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8
3.5 使用 @GlobalTransactional 开启事务
- 在业务的发起方的方法上使用
@GlobalTransactional
开启全局事务,Seata 会将事务的 xid 通过拦截器添加到调用其他服务的请求中,实现分布式事务;
4. Seata AT 模式的实现原理
4.1 两个阶段
- AT 模式是基于 XA 事务模型演进而来的,所以它的整体机制也是一个改进版的两阶段提交协议;
- 第一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源;
- 第二阶段:提交异步化,非常快速地完成。回滚通过第一阶段的回滚日志进行反向补偿;
4.2 AT 模式第一阶段实现原理
- 在业务流程中执行库存扣减操作的数据库操作时,Seata 会基于数据源代理对原执行的 SQL 进行解析(Seata 在 0.9.0 版本之后支持自动代理);
-
然后将业务数据在更新前后保存到
undo_log
日志表中,利用本地事务的 ACID 特性,把业务数据的更新和回滚日志写入同一个本地事务中进行提交;- 提交前,向TC注册分支事务:申请
tbl_repo
表中主键值等于 1 的记录的全局锁; - 本地事务提交:业务数据的更新和前面步骤中生成的
UNDO_LOG
一并提交; - 将本地事务提交的结果上报给TC;
- 提交前,向TC注册分支事务:申请
- AT 模式和 XA 最大的不同点:分支的本地事务可以在第一阶段提交完成后马上释放本地事务锁定的资源;AT 模式降低了锁的范围,从而提升了分布式事务的处理效率;
4.3 AT 模式第二阶段实现原理
- TC 接收到所有事务分支的事务状态汇报之后,决定对全局事务进行提交或者回滚;
4.3.1 事务提交
- 如果决定是全局提交,说明此时所有分支事务已经完成了提交,只需要清理
UNDO_LOG
日志即可。这也是和 XA 最大的不同点;
- 分支事务收到 TC 的提交请求后把请求放入一个异步任务队列中,并马上返回提交成功的结果给 TC;
- 从异步队列中执行分支,提交请求,批量删除相应
UNDO_LOG
日志;
4.3.2 事务回滚
- 整个全局事务链中,任何一个事务分支执行失败,全局事务都会进入事务回滚流程;
- 也就是根据
UNDO_LOG
中记录的数据镜像进行补偿;
- 通过 XID 和 branch ID 查找到相应的
UNDO_LOG
记录; - 数据校验:拿
UNDO_LOG
中的 afterImage 镜像数据与当前业务表中的数据进行比较,如果不同,说明数据被当前全局事务之外的动作做了修改,那么事务将不会回滚; - 如果 afterImage 中的数据和当前业务表中对应的数据相同,则根据
UNDO_LOG
中的 beforelmage 镜像数据和业务 SQL 的相关信息生成回滚语句并执行; - 提交本地事务,并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC;
- 通过 XID 和 branch ID 查找到相应的
4.4 关于事务的隔离性保证
- 在 AT 模式中,当多个全局事务操作同一张表时,它的事务隔离性保证是基于全局锁来实现的;
4.4.1 写隔离
- 一阶段本地事务提交前,需要确保先拿到全局锁;
- 拿不到全局锁 ,不能提交本地事务。
- 拿全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁;
-
举例:
- tx1 一阶段拿到全局锁,tx2 等待;
- tx1 二阶段全局提交,释放全局锁,tx2 拿到全局锁提交本地事务;
- 如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚;
- 此时,如果 tx2 仍在等待该数据的全局锁,同时持有本地锁,则 tx1 的分支回滚会失败;
- 分支的回滚会一直重试,直到 tx2 的全局锁等锁超时,放弃全局锁并回滚本地事务释放本地锁,tx1 的分支回滚最终成功;
- tx1 一阶段拿到全局锁,tx2 等待;
- 因为整个过程全局锁在 tx1 结束前一直是被 tx1 持有的,所以不会发生脏写的问题;
4.4.2 读隔离
- 在数据库本地事务隔离级别读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是读未提交(Read Uncommitted) ;
- 在该隔离级别,所有事务都可以看到其他未提交事务的执行结果,产生脏读。这在最终一致性事务模型中是允许存在的,并且在大部分分布式事务场景中都可以接受脏读;
- 如果应用在特定场景下,必需要求全局的读已提交 ,目前 Seata 的方式是通过
SELECT FOR UPDATE
语句的代理; SELECT FOR UPDATE
语句的执行会申请全局锁 ,如果全局锁被其他事务持有,则释放本地锁(回滚SELECT FOR UPDATE
语句的本地执行)并重试;- 这个过程中,查询是被 block 住的,直到全局锁拿到,即读取的相关数据是已提交的,才返回;
最后
::: hljs-center
新人制作,如有错误,欢迎指出,感激不尽!
:::
::: hljs-center
欢迎关注公众号,会分享一些更日常的东西!
:::
::: hljs-center
如需转载,请标注出处!
:::
::: hljs-center
:::
以上是关于Seata架构篇 - AT模式的主要内容,如果未能解决你的问题,请参考以下文章