废话不多说丨分布式事务之seata使用,一文带你全盘掌握
Posted 马剑威老师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了废话不多说丨分布式事务之seata使用,一文带你全盘掌握相关的知识,希望对你有一定的参考价值。
今天给大家带来一个技术点的使用分享,就是分布式事务之seata使用。分布式事务的解决方案,是大家在面试中不可避免会被问到的,而且分布式事务的解决方案也非常多。
今天威哥就以seata为例,把seata的使用在这里做了一个总结,希望能够帮助到大家。本案例在实施过程中,威哥力求简单明了,希望大家在学习过程中能够掌握其中的每个细节。好了,废话不多说,如果我们要学习seata,首先需要具备如下技术储备:
-
数据库事务的基本知识;
-
maven工具的使用;
-
熟悉SpringCloudAlibaba技术栈;
-
掌握SpringDataJPA简单使用;
一. Seata基本概念
1.seata是什么
Seata是阿里巴巴中间件团队发起了开源项目,其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题,后来更名为 Seata。
Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。它把一个分布式事务理解成一个包含了若干分支事务的全局事务。
全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务。
2.seata基本架构
听到这里,是不是觉得很晦涩?那么威哥通过一幅图来帮助你们进一步理解seata的架构:
通过这幅图,我们看到了seata的三个重要的组件,分别是TC TM RM。那么他们到底是什么东西呢?
-
TC:Transaction Coordinator事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。
-
TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
-
RM:Resource Manager资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。
3.seata执行流程
搞清楚了这几个组件的含义之后,那么seata的整个执行流程我们就可以梳理清楚了:
A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID。
服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖。
A服务执行分支事务,向数据库做操作。
A服务开始远程调用B服务,此时XID会在微服务的调用链上传播。
B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖。
B服务执行分支事务,向数据库做操作。
全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚。
TC协调其管辖之下的所有分支事务,决定是否回滚。
二. 案例环境搭建
我们搞清楚Seata的相关概念之后,现在威哥带领大家实现一个需求:通过订单微服务实现下订单的操作,然后通知库存微服务进行库存的扣减。
1. 前期准备
我们需要先准备订单和商品实体类。
//商品
@Entity(name = "shop_product")
@Data
public class Product
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer pid;//主键
private String pname;//商品名称
private Double pprice;//商品价格
private Integer stock;//库存
//订单
@Entity(name = "shop_order")
@Data
public class Order
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long oid;//订单id
private Integer uid;//用户id
private String username;//用户名
private Integer pid;//商品id
private String pname;//商品名称
private Double pprice;//商品单价
private Integer number;//购买数量
我们还需要准备项目必备的pom依赖:
这是父工程的pom.xml文件:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>$spring-cloud.version</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>$spring-cloud-alibaba.version</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
2. 搭建对应的微服务
现在我们分别搭建商品微服务和订单微服务
2.1 创建公共通用模块
我们创建shop-common模块,专门存放一些公共的实体类和工具类,便于其他模块进行共享。
2.1.1 在公共模块添加相关的依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>$spring-cloud.version</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>$spring-cloud-alibaba.version</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
然后把之前准备的实体类都拷贝到这个shop-common中来。
2.2 搭建订单微服务模块
2.2.1 添加必要依赖
取名shop-order,在这个模块里面添加相关的依赖。
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qf.common</groupId>
<artifactId>springcloudAlibaba-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qf.feign</groupId>
<artifactId>springcloudAlibaba-order-product-feign</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
</dependencies>
2.2.2 编写controller
在这里,我们编写一个下单操作的服务接口。
@RestController
@RequestMapping("order")
@Slf4j
public class OrderController
@Autowired
OrderService5 orderService5;
@RequestMapping("prod/pid")
public Order order(@PathVariable("pid") Integer pid)
return orderService5.createOrder(pid);
2.2.3 编写service
public interface OrderService
Order createOrder(Integer pid);
@Service
@Slf4j
public class OrderServiceImpl implements OrderService
@Autowired
OrderFeign orderFeign;
@Autowired
OrderDao orderDao;
@Override
public Order createOrder(Integer pid)
//查询指定的商品信息
Product product = orderFeign.findProductByPid(pid);
log.info("查询到的商品信息是:", JSON.toJSONString(product));
//执行下单的操作
Order order = new Order();
order.setUid(1003);
order.setUsername("测试Seata案例");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
//设置订单中的商品数量
order.setNumber(1);
orderDao.save(order);
log.info("订单创建成功,订单信息是:",JSON.toJSONString(order));
//执行扣减库存的操作
orderFeign.reduceStock(pid,order.getNumber());
return order;
2.2.4 编写feign客户端
@FeignClient(name = "service-product")
public interface OrderFeign
@RequestMapping("product/pid")
public Product findProductByPid(@PathVariable("pid") Integer pid);
@RequestMapping("product/reduceStock")
void reduceStock(@RequestParam("pid") Integer pid,@RequestParam("number") Integer number);
2.2.5 编写dao
public interface OrderDao extends JpaRepository<Order,Integer>
2.3 搭建商品微服务模块
2.3.1 添加必要依赖
取名shop-product,在这个模块里面添加相关的依赖。
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.qf.common</groupId>
<artifactId>springcloudAlibaba-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
</dependencies>
2.3.2 编写controller
@RestController
@Slf4j
@RefreshScope //配置信息的即时刷新
public class ProductController
@Autowired
ProductService productService;
//根据id查询对应的商品信息
@RequestMapping("product/pid")
public Product findProductByPid(@PathVariable("pid") Integer pid)
Product product = productService.findProductByPid(pid);
//JSON.toJSONString 把指定数据转换成json串
log.info("查询到的对应的商品是:" + JSON.toJSONString(product));
return product;
//扣减库存
@RequestMapping("product/reduceStock")
public void reduceStock(@RequestParam("pid") Integer pid,@RequestParam("number") Integer number)
productService.reduceStock(pid,number);
2.3.3 编写service
public interface ProductService
Product findProductByPid(Integer pid);
void reduceStock(Integer pid, Integer number);
@Service
public class ProductServiceImpl implements ProductService
@Autowired
ProductDao productDao;
@Override
public Product findProductByPid(Integer pid)
Optional<Product> optional = productDao.findById(pid);
return optional.get();
@Override
public void reduceStock(Integer pid, Integer number)
Product product = productDao.findById(pid).get();
product.setStock(product.getStock() - number);
productDao.save(product);
2.3.4 编写dao
public interface ProductDao extends JpaRepository<Product,Integer>
现在我们启动测试,目前代码是没有什么问题的。但是如果我手动模拟异常。具体操作如下:
此时我们再去测试,这个时候就出现了问题了。我们发现订单能够下单成功,但是库存没有扣减。这样就出现了数据不一致的事务问题。那么我们可以使用seata来帮我们解决问题。
三. 配置使用seata
1. 下载seata
同学们可以在如下资源链接上进行下载:
下载地址:https://github.com/seata/seata/releases/v0.9.0/
2. 配置Seata
我们下载下来之后,会是一个压缩包。我们把这个压缩包打开之后进行相关配置。
2.1 修改registry.conf,指定seata使用nacos注册中心
registry
# 支持的注册中心有:file 、nacos 、eureka、redis、zk、consul、etcd3、sofa 我们使用自己的注册中心即可 所以删除其他注册中心相关的配置
type = "nacos"
nacos
serverAddr = "localhost"
namespace = ""
cluster = "default"
config
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos
serverAddr = "localhost"
namespace = ""
2.2 修改nacos-config.txt,指定我们的服务名称
2.3 初始化seata在nacos中的配置
我们需要把seata相关的配置信息在nacos配置中心进行注册。
# 初始化seata 的nacos配置 # 注意: 这里要保证nacos是已经正常运行的
cd conf
nacos-config.sh 127.0.0.1
执行成功后可以打开Naco的控制台,在配置列表中,可以看到初始化了很多Group为SEATA_GROUP的配置。
2.4 启动seata服务
切换到bin目录执行以下命令:
cd bin
seata-server.bat -p 9000 -m file
启动后在 Nacos 的服务列表下面可以看到一个名为 serverAddr的服务。如果入下图所示,小伙伴们,seata服务启动成功!!!!!
3. 使用seata进行事务控制
3.1 初始化一张数据表,用来seata进行日志记录
CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = INNODB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
3.2 在微服务中添加seata相关的依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
3.3 配置DataSourceProxyConfig代理数据源
Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy 的 Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务.
在shop-product和shop-order里面都添加如下配置类:
@Configuration
public class DataSourceProxyConfig
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource()
return new DruidDataSource();
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource)
return new DataSourceProxy(druidDataSource);
-
在shop-product和shop-order的resources目录添加registry.conf(直接将seata里面的配置复制过来即可)。
-
在shop-product和shop-order的resources目录添加bootstrap.yml里面,然后添加配置。
3.4 在shop-order微服务中开启全局事务
4. 结果测试
我们发生请求:http://localhost:8091/order5/prod/1
此时查看数据库,我们发现事务问题得到了控制。就是当发生异常的时候,下单的记录被回滚了,而且库存也没有出现扣减。
到现在,我们的分布式事务就得到了控制,小伙伴们,你们有没有学会呢?可以留言或者java学习+资料获取 加q群:691533824 扫码学习啦!
五分钟带你体验一把分布式事务!so easy!
网上关于分布式事务讲理论的多,讲实战的少,今天我想通过一个案例,来让小伙伴们感受一把分布式事务,咱们今天尽量少谈点理论。咱们今天的主角是 Seata!
分布式事务涉及到很多理论,如 CAP,BASE 等,很多小伙伴刚看到这些理论就被劝退了,所以我们今天不讲理论,咱们就看个 Demo,通过代码快速体验一把什么是分布式事务。
1. 什么是 Seata?
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
Seata 支持的事务模式有四种分别是:
- Seata AT 模式
- Seata TCC 模式
- Seata Saga 模式
- Seata XA 模式
Seata 中有三个核心概念:
- TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
- RM ( Resource Manager ) - 资源管理器:管理分支事务处理的资源( Resource ),与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端。
这些概念小伙伴们作为一个了解即可,不了解也能用 Seata,了解了更能理解 Seata 的工作原理。
2. 搭建 Seata 服务端
我们先来把 Seata 服务端搭建起来。
Seata 下载地址:
目前最新版本是 1.4.2,我们就使用最新版本来做。
这个工具在 Windows 或者 Linux 上部署差别不大,所以我这里就直接部署在 Windows 上了,方便一些。
我们首先下载 1.4.2 版本的 zip 压缩包,下载之后解压,然后在 conf 目录中配置两个地方:
- 首先配置 file.conf 文件
file.conf 中配置 TC 的存储模式,TC 的存储模式有三种:
- file:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高。
- db:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点。
- redis:适合集群模式,全局事务会话信息通过 redis 共享,相对性能好点,但是要注意,redis 模式在 Seata-Server 1.3 及以上版本支持,性能较高,不过存在事务信息丢失的风险,所以需要开发者提前配置适合当前场景的 redis 持久化配置。
这里我们为了省事,配置为 file 模式,这样事务会话信息读写在内存中完成,持久化则写到本地 file,如下图:
如果配置 db 或者 redis 模式,大家记得填一下下面的相关信息。具体如下图:
题外话
注意,如果使用 db 模式,需要提前准备好数据库脚本,如下(小伙伴们可以直接在公众号江南一点雨后台回复 seata-db 下载这个数据库脚本):
CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `seata2`;
/*Table structure for table `branch_table` */
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*Data for the table `branch_table` */
/*Table structure for table `global_table` */
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*Data for the table `global_table` */
/*Table structure for table `lock_table` */
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(128) DEFAULT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
另外还需要注意的是自己的数据库版本信息,改数据库连接的时候按照实际情况修改,Seata 针对 MySQL5.x 和 MySQL8.x 都提供了对应的数据库驱动(在 lib 目录下),我们只需要把驱动改好就行了。
- 再配置 registry.conf 文件
registry.conf 主要配置 Seata 的注册中心,我们这里采用大家比较熟悉的 Eureka,配置如下:
可以看到,支持的配置中心比较多,我们选择 Eureka,选好配置中心之后,记得修改配置中心相关的信息。
OK,现在就配置完成了,但是先别启动,还差一个 Eureka 注册中心。
3. 项目配置
接下来我们配置项目。
Seata 官方提供了一个非常经典的 Demo,我们直接来看这个 Demo。
官方案例下载地址:https://github.com/seata/seata-samples
不过这里是很多案例混在一起的,可能看起来会比较乱,而且由于要下载的依赖比较多,所以极有可能依赖下载失败,因此大家也可以在公众号后台回复 seata-demo
获取松哥整理好的案例,直接导入即可,如下图:
这是一个商品下单的案例,我来和大家稍微解释下:
- eureka:这是服务注册中心。
- account:这是账户服务,可以查询/修改用户的账户信息(主要是账户余额)。
- order:这是订单服务,可以下订单。
- storage:这是一个仓储服务,可以查询/修改商品的库存数量。
- bussiness:这是业务,用户下单操作将在这里完成。
这个案例讲了一个什么事呢?
当用户想要下单的时候,调用了 bussiness 中的接口,bussiness 中的接口又调用了它自己的 service,在 service 中,首先开启了全局分布式事务,然后通过 feign 调用 storage 中的接口去扣库存,然后再通过 feign 调用 order 中的接口去创建订单(order 在创建订单的时候,不仅会创建订单,还会扣除用户账户的余额),在扣除库存并完成订单创建之后,接下来会去检查用户的余额和库存数量是否正确,如果用户余额为负数或者库存数量为负数,则会进行事务回滚,否则提交事务。
本案例具体架构如下图:
这个案例就是一个典型的分布式事务问题,storage 和 order 中的事务分属于不同的微服务,但是我们希望他们同时成功或者同时失败。
现在大家明白了这个案例是干嘛的,我们就来把它跑起来。
首先创建一个名为 seata 的数据库,然后执行上面代码中的 all.sql 数据脚本。
接下来用 idea 打开上面这个项目,在每一个项目的 application.properties 文件中(Eureka 不用改),修改数据的连接信息,如下图:
除了 Eureka 之外,另外四个都要改哦。
OK,配置结束。
4. 启动测试
首先启动 Eureka。
接下来先别记着启动其他服务,先启动 Seata Server,也就是我们第二小节配置的那个服务,在它的 bin 目录下,Windows 下双击/Linux 下执行启动脚本。
最后再分别启动剩下的四个服务,启动完成后,我们可以在 Eureka 中查看相关信息:
可以看到,各个服务都注册上来了。
接下来我们访问 bussiness 中提供的两个测试接口。
第一个测试接口是:
http://127.0.0.1:8084/purchase/commit
这个接口对应的代码是:io.seata.sample.controller.BusinessController#purchaseCommit
,这个地方是模拟 U100000
用户购买了 30
个 C100000
商品,每个商品的价格是 100
,商品库存是 200
,用户账户余额是 10000
,所以购买之后,商品库存变为 170
,用户账户余额变为 7000
。这是正常购买的情况。
@RequestMapping(value = "/purchase/commit", produces = "application/json")
public String purchaseCommit() {
try {
businessService.purchase("U100000", "C100000", 30);
} catch (Exception exx) {
return exx.getMessage();
}
return "全局事务提交";
}
当我们调完这个接口之后,就可以去数据库查看相应的数据。
第二个测试的接口是:
http://127.0.0.1:8084/purchase/rollback
这个接口对应的代码是:io.seata.sample.controller.BusinessController#purchaseRollback
,这次是模拟用户购买 99999
个商品,无论是用户账户余额还是商品库存数量,都无法支撑这次购买行为,因此这个接口的调用最终会回滚,数据库中的数据会保持原样。
@RequestMapping("/purchase/rollback")
public String purchaseRollback() {
try {
businessService.purchase("U100000", "C100000", 99999);
} catch (Exception exx) {
return exx.getMessage();
}
return "全局事务提交";
}
这就是一个分布式事务案例。
小伙伴们感兴趣也可以研究一下官方这个案例,我们会发现这里的东西非常简单,单纯是如下方法上多了一个注解而已(io.seata.sample.service.BusinessService#purchase
):
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
storageFeignClient.deduct(commodityCode, orderCount);
orderFeignClient.create(userId, commodityCode, orderCount);
if (!validData()) {
throw new RuntimeException("账户或库存不足,执行回滚");
}
}
purchase 方法用 @GlobalTransactional
注解标记了下,就开启了全局事务了,里边的两个调用都是 feign 的调用,对应了不同的服务,最后再做一个数据校验,校验失败就抛出异常,一旦该方法抛出异常,上面已经执行的代码就会回滚。
这个项目其余的代码都是微服务中的常规代码,就不赘述了。
5. 实现原理
我们稍微来说下 Seata 中这个分布式事务的原理,先来看一张图:
这张图非常清晰的描述了上面的案例,大致流程如下:
- 有三个概念:TM、RM、TC,这些我们在第一小节已经介绍过了,这里就不再赘述。
- 首先由 Business 开启全局事务。
- 接下来 Business 在调用 Storage 和 Order 的时候,这两个在数据库操作之前都会向 TC 注册一个分支事务并提交。
- 分支事务在操作时,都会向 undo_log 表中提交一条记录,当全局事务提交的时候会清空 undo_log 表中的记录,否则将以该表中的记录为依据进行反向补偿(将数据恢复原样)。
具体到上面的案例,事务提交分两个阶段,过程如下:
一阶段:
- 首先 Business 开启全局事务,这个过程中会向 TC 注册,然后会拿到一个 xid,这是一个全局事务 id。
- 接下来在 Business 中调用 Storage 微服务。
- 来解析 SQL:得到 SQL 的类型(UPDATE),表(storage_tbl),条件(where commodity_code = ‘C100000’)等相关的信息。
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
- 执行业务 SQL,也就是做真正的数据更新操作。
- 查询后镜像:根据前镜像的结果,通过主键定位数据。
- 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
branch_id 和 xid 分别表示分支事务(即 Storage 自己的事务)和全局事务的 id,rollback_info 中保存着前后镜像的内容,这个将作为反向补偿(回滚)的依据,这个字段的值是一个 JSON,松哥挑出来这个 JSON 中比较重要的一部分来和大家分享:
- beforeImage:这个是修改前数据库中的数据,可以看到每个字段的值,id 为 4,count 的值为 200。
- afterImage:这个是修改后数据库中的数据,可以看到,此时 id 为 4,count 的值为 170。
- Storage 在提交前,会向 TC 注册分支:申请 storage_tbl 表中,主键值等于 4 的记录的全局锁。
- 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
- 同理,Order 和 Account 也按照上面的步骤提交数据。
以上 1-10 步就是一阶段的数据提交。
再来看二阶段:
二阶段有两种可能,提交或者回滚。
还是以上面的案例为例:
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
storageFeignClient.deduct(commodityCode, orderCount);
orderFeignClient.create(userId, commodityCode, orderCount);
if (!validData()) {
throw new RuntimeException("账户或库存不足,执行回滚");
}
}
下单时候,扣除了库存,并且创建了订单,最后一检查,发现库存为负数或者用户账户余额为负数,说明这个订单有问题,此时就该抛异常回滚,否则就提交数据。
具体操作如下:
回滚:
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 xid 和 branch_id 去 undo_log 表中查找对应的记录。
- 数据校验:拿第二步查找到的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理。
- 第三步的比较如果相同,则根据 undo_log 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句。
- 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
提交:
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
换句话说,事务如果正常提交了,undo_log 表中是没有记录的,如果大家想看该表中的记录,可以在事务提交之前通过 DEBUG 的方式查看。
6. 小结
讲了这么多,是不是就把 Seata 讲完了呢?NONONO!这只是 AT 模式而已!还有三种模式,松哥下篇文章再和小伙伴们分享。
好啦,这就是一个简单的分布式事务,小伙伴们先来感受一把!标题是五分钟感受一把分布式事务,因为文章里边我还和大家分享了原理,如果大家只是跑一下案例感受,五分钟应该够了,不信试试!
以上是关于废话不多说丨分布式事务之seata使用,一文带你全盘掌握的主要内容,如果未能解决你的问题,请参考以下文章
深入浅出Seata原理及实战「入门基础专题」带你透析认识Seata分布式事务服务的原理和流程