分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)
Posted 洛神灬殇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)相关的知识,希望对你有一定的参考价值。
分布式事务中间件对⽐与选择
- tx-lcn
- EasyTransaction
- ByteTCC
- Seata
Seata实现分布式事务
我们主要以Seata的分布式事务框架进行介绍分析,相关的并且针对于其三种模式进行分别说明介绍。
搭建Seata Server
前往https://github.com/seata/seata/releases 下载Seata安装包,本书使⽤Seata 1.0.0。将⽬录切换⾄Seata根⽬录,根据操作系统,执⾏对应命令,即可启动Seata Server。
Linux/Unix/Mac
sh ./bin/seata-server.sh
Windows
bin\\seata-server.bat
启动时,也可指定参数
$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file
⽀持的参数如下表所示
Seata AT模式
- ⼀阶段:业务数据和回滚⽇志记录在同⼀个本地事务中提交,释放本地锁和连接资源。
- ⼆阶段:提交异步化,⾮常快速地完成。回滚通过⼀阶段的回滚⽇志进⾏反向补偿。
官⽅⽂档
http://seata.io/zh-cn/docs/dev/mode/at-mode.html
代码演示
Maven依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
tx-service-group: content-center-seata-service-group
service:
vgroup-mapping:
content-center-seata-service-group: seata-cluster
grouplist:
seata-cluster: 127.0.0.1:8091
disable-global-transaction: false
配置说明
- tx-service-group:事务分组,默认是 $spring.application.name-seata-service-group ,唯⼀即可。
- vgroup-mapping:事务分组映射,表示 tx-service-group 对应到哪个Seata Server集群。
- key是tx-service-group的值,value是集群名称,唯⼀即可
- grouplist:集群中所包含的Seata Server的地址列表,key是vgroup-mapping中value的值,
- value是Seata Server的地址列表
- disable-global-transaction:是否开启全局事务开关,默认false。
在Seata1.0.0中,该配置⽆法正常读取,这是⼀个Bug,详⻅ https://github.com/seata/seata/issues/2114 ,好在,该配置的默认值就是false,所以不影响使⽤。
创建Seata事务记录表
-- auto-generated definition
create table undo_log (
id bigint auto_increment comment 'increment id' primary key,
branch_id bigint not null comment 'branch transaction id',
xid varchar(100) not null comment 'global transaction id',
context varchar(128) not null comment 'undo_log context,such as serialization',
rollback_info longblob not null comment 'rollback info',
log_status int not null comment '0:normal status,1:defense status',
log_created datetime not null comment 'create datetime',
log_modified datetime not null comment 'modify datetime',
constraint ux_undo_log
unique (xid, branch_id)
) comment 'AT transaction mode undo table' charset = utf8;
Controller代码:
private final ShareSeataService shareSeataService;
@PutMapping("/audit/seata1/id")
public Share auditByIdSeata1(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO)
return this.shareSeataService.auditById(id, auditDTO);
Service代码:
- 审核中心服务代码(含调用用户中心代码接口)
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareSeataService
private final ShareMapper shareMapper;
private final UserCenterFeignClient userCenterFeignClient;
@GlobalTransactional(rollbackFor = Exception.class)
public Share auditById(Integer id, ShareAuditDTO auditDTO)
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum()))
userCenterFeignClient.addBonus(id, 50);
// 故意抛异常,如果⽤户中⼼侧也能回滚,说明实现了分布式事务
// throw new IllegalArgumentException("发⽣异常...");
this.auditByIdInDB(id, auditDTO);
return this.shareMapper.selectByPrimaryKey(id);
- 审核中心服务代码(执行操作更新数据库机制)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO)
Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
@GlobalTransactional 注解⽤来创建分布式事务。
被调⽤⽅代码
Maven依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
tx-service-group: user-center-seata-service-group
service:
vgroup-mapping:
user-center-seata-service-group: seata-cluster
grouplist:
seata-cluster: 127.0.0.1:8091
disable-global-transaction: false
可以看出来,差距主要体现在tx-service-group的值。
Controller代码:
@GetMapping("/add-bonus/id/bonus")
public User addBonus(@PathVariable Integer id, @PathVariable Integer bonus)
this.userService.addBonus(id, bonus);
return this.userService.findById(id);
Service代码:
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UserService
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
public User findById(Integer id)
// select * from user where id = #id
return this.userMapper.selectByPrimaryKey(id);
public void addBonus(Integer userId, Integer bonus)
// 1. 为⽤户加积分
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 记录⽇志到bonus_event_log表⾥⾯
this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分..").build());
log.info("积分添加完毕...");
Seata TCC模式
- ⼀阶段 prepare ⾏为
- ⼆阶段 commit 或 rollback ⾏为
需要实现的3个⽅法:
- ⼀阶段:
- ⽤于业务预处理的⽅法,即 Try 阶段、的⽅法,⽐如冻结⽤户的部分余额等等;
- ⼆阶段:
- ⽤于提交业务的⽅法,即 Commit ⽅法,⽐如扣除⽤户之前冻结的部分余额;
- ⽤于回滚业务的⽅法,即 Rollback ⽅法,⽐如返还之前冻结的⽤户余额;
官⽅⽂档
http://seata.io/zh-cn/docs/dev/mode/tcc-mode.html
代码演示
行为操作接口
@LocalTCC
public interface TccActionOne
@TwoPhaseBusinessAction(name = "TccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(BusinessActionContext actionContext, int a);
boolean commit(BusinessActionContext actionContext);
boolean rollback(BusinessActionContext actionContext);
接口实现类
- 实现类1
@Component
public class TccActionOneImpl implements TccActionOne
@Override
public boolean prepare(BusinessActionContext actionContext, int a)
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne prepare, xid:" + xid);
return true;
@Override
public boolean commit(BusinessActionContext actionContext)
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne commit, xid:" + xid);
ResultHolder.setActionOneResult(xid, "T");
return true;
@Override
public boolean rollback(BusinessActionContext actionContext)
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionOne rollback, xid:" + xid);
ResultHolder.setActionOneResult(xid, "R");
return true;
@LocalTCC
public interface TccActionTwo
@TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(BusinessActionContext actionContext, int a);
boolean commit(BusinessActionContext actionContext);
boolean rollback(BusinessActionContext actionContext);
- 实现类2
@Component
public class TccActionTwoImpl implements TccActionTwo
@Override
public boolean prepare(BusinessActionContext actionContext, String b)
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo prepare, xid:" + xid);
return true;
@Override
public boolean commit(BusinessActionContext actionContext)
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo commit, xid:" + xid);
ResultHolder.setActionTwoResult(xid, "T");
return true;
@Override
public boolean rollback(BusinessActionContext actionContext)
// 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
String xid = actionContext.getXid();
System.out.println("TccActionTwo rollback, xid:" + xid);
ResultHolder.setActionTwoResult(xid, "R");
return true;
- 聚合实现服务业务实现类执行
@Service
public class ShareSeataService
@Autowired
TccActionOne tccActionOne;
@Autowired
TccActionTwo tccActionTwo;
@GlobalTransactional
public void tccTransactionCommit(Map<String, String> paramMap)
//第一个TCC 事务参与者
boolean result = tccActionOne.prepare(null, "one");
if (!result)
paramMap.put("xid",RootContext.getXID());
throw new RuntimeException("TccActionOne failed.");
List list = new ArrayList();
list.add("c1");
list.add("c2");
result = tccActionTwo.prepare(null, "two");
if (!result)
paramMap.put("xid",RootContext.getXID());
throw new RuntimeException("TccActionTwo failed.");
paramMap.put("xid",RootContext.getXID());
return ;
// 回滚的代码相似,就不写了
- 执行调用点
@Slf4j
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController
private final ShareSeataService shareSeataService;
@GetMapping("tcc-commit")
public String tccTransactionCommit()
Map<String, String> map = new HashMap<>();
this.shareSeataService.tccTransactionCommit(map);
String xid = map.get("xid");
// 结果T
return ResultHolder.getActionOneResult(xid);
@GetMapping("/tcc-rollback")
public String tccTransactionRollback()
Map<String, String> map = new HashMap<>();
try
this.shareSeataService.tccTransactionRollback(map);
catch (Throwable t)
log.warn("事务回滚..", t);
String xid = map.get("xid");
// 结果R
return ResultHolder.getActionOneResult(xid);
定义状态机⽂件:
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States":
"ReduceInventory":
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": [
"$.[businessKey]",
"$.[count]"
],
"Output":
"reduceInventoryResult": "$.#root"
,
"Status":
"#root == true": "SU",
"#root == false": "FA",
"$Exceptionjava.lang.Throwable": "UN"
,
"ChoiceState":
"Type": "Choice",
"Choices": [
"Expression": "[reduceInventoryResult] == true",
"Next": "ReduceBalance"
],
"Default": "Fail"
,
"ReduceBalance":
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
"throwException": "$.[mockReduceBalanceFail]"分布式事务27
],
"Output":
"compensateReduceBalanceResult": "$.#root"
,
"Status":
"#root == true": "SU",
"#root == false": "FA",
"$Exceptionjava.lang.Throwable": "UN"
,
"Catch": [
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
],
"Next": "Succeed"
,
"CompensateReduceInventory":
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
,
"CompensateReduceBalance":
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
,
"CompensationTrigger":
"Type": "CompensationTrigger",
"Next": "Fail"
,
"Succeed":
"Type": "Succeed"
,
"Fail":
"Type": "Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
测试代码:
public class LocalSagaTransactionStarter
public static void main(String[] args)
AbstractApplicationContext applicationContext = new ClassPathXmlApplication
Context(new String[] "spring/seata-saga.xml");
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationCon
text.getBean("stateMachineEngine");
transactionCommittedDemo(stateMachineEngine);
transactionCompensatedDemo(stateMachineEngine);
new ApplicationKeeper(applicationContext).keep();
private static void transactionCommittedDemo(StateMachineEngine stateMachineEng
ine)
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
InventoryAndBalance", null, businessKey, startParams);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
n execute failed. XID: " + inst.getId());
System.out.以上是关于分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)的主要内容,如果未能解决你的问题,请参考以下文章
分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)
分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(下)
分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(上)
分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(中)