分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)相关的知识,希望对你有一定的参考价值。

分布式事务中间件对⽐与选择

  • tx-lcn
  • EasyTransaction
  • ByteTCC
  • Seata

Seata实现分布式事务

我们主要以Seata的分布式事务框架进行介绍分析,相关的并且针对于其三种模式进行分别说明介绍。

搭建Seata Server

前往https://github.com/seata/seata/releases 下载Seata安装包,本书使⽤Seata 1.0.0。将⽬录切换⾄Seata根⽬录,根据操作系统,执⾏对应命令,即可启动Seata Server。

Linux/Unix/Mac

sh ./bin/seata-server.sh

Windows

bin\\seata-server.bat

启动时,也可指定参数

$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file
⽀持的参数如下表所示

Seata AT模式

  • ⼀阶段业务数据和回滚⽇志记录在同⼀个本地事务中提交,释放本地锁和连接资源。
  • ⼆阶段提交异步化,⾮常快速地完成。回滚通过⼀阶段的回滚⽇志进⾏反向补偿。
官⽅⽂档

http://seata.io/zh-cn/docs/dev/mode/at-mode.html

代码演示
Maven依赖
<dependency>
   <groupId>com.alibaba.cloud</groupId>
   <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
   tx-service-group: content-center-seata-service-group
   service:
    vgroup-mapping:
        content-center-seata-service-group: seata-cluster
        grouplist:
           seata-cluster: 127.0.0.1:8091
           disable-global-transaction: false
配置说明
  • tx-service-group:事务分组,默认是 $spring.application.name-seata-service-group ,唯⼀即可。
  • vgroup-mapping:事务分组映射,表示 tx-service-group 对应到哪个Seata Server集群。
  • key是tx-service-group的值,value是集群名称,唯⼀即可
  • grouplist:集群中所包含的Seata Server的地址列表,key是vgroup-mapping中value的值,
  • value是Seata Server的地址列表
  • disable-global-transaction:是否开启全局事务开关,默认false。

在Seata1.0.0中,该配置⽆法正常读取,这是⼀个Bug,详⻅ https://github.com/seata/seata/issues/2114 ,好在,该配置的默认值就是false,所以不影响使⽤。

创建Seata事务记录表
-- auto-generated definition
create table undo_log (
id bigint auto_increment comment 'increment id' primary key,
branch_id bigint not null comment 'branch transaction id',
xid varchar(100) not null comment 'global transaction id',
context varchar(128) not null comment 'undo_log context,such as serialization',
rollback_info longblob not null comment 'rollback info',
log_status int not null comment '0:normal status,1:defense status',
log_created datetime not null comment 'create datetime',
log_modified datetime not null comment 'modify datetime',
constraint ux_undo_log
unique (xid, branch_id)
) comment 'AT transaction mode undo table' charset = utf8;
Controller代码:
private final ShareSeataService shareSeataService;
@PutMapping("/audit/seata1/id")
public Share auditByIdSeata1(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) 
     return this.shareSeataService.auditById(id, auditDTO);

Service代码:
  • 审核中心服务代码(含调用用户中心代码接口)
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareSeataService 
    private final ShareMapper shareMapper;
    private final UserCenterFeignClient userCenterFeignClient;

   @GlobalTransactional(rollbackFor = Exception.class)
    public Share auditById(Integer id, ShareAuditDTO auditDTO) 
        if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum()))  
            userCenterFeignClient.addBonus(id, 50);
           // 故意抛异常,如果⽤户中⼼侧也能回滚,说明实现了分布式事务
          // throw new IllegalArgumentException("发⽣异常...");
        
        this.auditByIdInDB(id, auditDTO);
        return this.shareMapper.selectByPrimaryKey(id);
   
  • 审核中心服务代码(执行操作更新数据库机制)
    public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) 
        Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason())
       .build();
        this.shareMapper.updateByPrimaryKeySelective(share);
    

@GlobalTransactional 注解⽤来创建分布式事务。

被调⽤⽅代码
Maven依赖
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置
seata:
    tx-service-group: user-center-seata-service-group
      service:
      vgroup-mapping:
         user-center-seata-service-group: seata-cluster
         grouplist:
            seata-cluster: 127.0.0.1:8091
            disable-global-transaction: false

可以看出来,差距主要体现在tx-service-group的值。

Controller代码:
@GetMapping("/add-bonus/id/bonus")
public User addBonus(@PathVariable Integer id, @PathVariable Integer bonus) 
    this.userService.addBonus(id, bonus);
    return this.userService.findById(id);

Service代码:
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UserService 
    private final UserMapper userMapper;
    private final BonusEventLogMapper bonusEventLogMapper;
    public User findById(Integer id) 
    // select * from user where id = #id
        return this.userMapper.selectByPrimaryKey(id);
    

public void addBonus(Integer userId, Integer bonus) 
    // 1. 为⽤户加积分
    User user = this.userMapper.selectByPrimaryKey(userId);
    user.setBonus(user.getBonus() + bonus);
    this.userMapper.updateByPrimaryKeySelective(user);
    // 2. 记录⽇志到bonus_event_log表⾥⾯
    this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分..").build());
    log.info("积分添加完毕...");
 

Seata TCC模式

  • ⼀阶段 prepare ⾏为
  • ⼆阶段 commit 或 rollback ⾏为
需要实现的3个⽅法:
  • ⼀阶段:
    • ⽤于业务预处理的⽅法,即 Try 阶段、的⽅法,⽐如冻结⽤户的部分余额等等;
  • ⼆阶段:
    • ⽤于提交业务的⽅法,即 Commit ⽅法,⽐如扣除⽤户之前冻结的部分余额;
    • ⽤于回滚业务的⽅法,即 Rollback ⽅法,⽐如返还之前冻结的⽤户余额;
官⽅⽂档

http://seata.io/zh-cn/docs/dev/mode/tcc-mode.html

代码演示
行为操作接口
@LocalTCC
public interface TccActionOne 
   @TwoPhaseBusinessAction(name = "TccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(BusinessActionContext actionContext, int a);
    boolean commit(BusinessActionContext actionContext);
    boolean rollback(BusinessActionContext actionContext);

接口实现类
  • 实现类1
@Component
public class TccActionOneImpl implements TccActionOne 
    @Override
    public boolean prepare(BusinessActionContext actionContext, int a) 
        // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
        String xid = actionContext.getXid();
        System.out.println("TccActionOne prepare, xid:" + xid);
        return true;
    
    @Override
    public boolean commit(BusinessActionContext actionContext) 
        // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
        String xid = actionContext.getXid();
        System.out.println("TccActionOne commit, xid:" + xid);
        ResultHolder.setActionOneResult(xid, "T");
        return true;
    
    @Override
    public boolean rollback(BusinessActionContext actionContext) 
        // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
        String xid = actionContext.getXid();
        System.out.println("TccActionOne rollback, xid:" + xid);
        ResultHolder.setActionOneResult(xid, "R");
        return true;
    

@LocalTCC
public interface TccActionTwo 
    @TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(BusinessActionContext actionContext, int a);
    boolean commit(BusinessActionContext actionContext);
    boolean rollback(BusinessActionContext actionContext);

  • 实现类2
@Component
public class TccActionTwoImpl implements TccActionTwo 
       @Override
       public boolean prepare(BusinessActionContext actionContext, String b) 
              // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
              String xid = actionContext.getXid();
              System.out.println("TccActionTwo prepare, xid:" + xid);
              return true;
       
       @Override
       public boolean commit(BusinessActionContext actionContext) 
              // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
              String xid = actionContext.getXid();
              System.out.println("TccActionTwo commit, xid:" + xid);
              ResultHolder.setActionTwoResult(xid, "T");
              return true;
       
       @Override
       public boolean rollback(BusinessActionContext actionContext) 
              // 这⾥是本地玩的,也可以调⽤其他微服务的接⼝
              String xid = actionContext.getXid();
              System.out.println("TccActionTwo rollback, xid:" + xid);
              ResultHolder.setActionTwoResult(xid, "R");
              return true;
       

  • 聚合实现服务业务实现类执行

@Service
public class ShareSeataService

    @Autowired
    TccActionOne tccActionOne;

    @Autowired
    TccActionTwo tccActionTwo;

    @GlobalTransactional
    public void tccTransactionCommit(Map<String, String> paramMap) 
        //第一个TCC 事务参与者
        boolean result = tccActionOne.prepare(null, "one");
        if (!result) 
			paramMap.put("xid",RootContext.getXID());
            throw new RuntimeException("TccActionOne failed.");
        
        List list = new ArrayList();
        list.add("c1");
        list.add("c2");
        result = tccActionTwo.prepare(null, "two");
        if (!result) 
            paramMap.put("xid",RootContext.getXID());
            throw new RuntimeException("TccActionTwo failed.");
        
       paramMap.put("xid",RootContext.getXID());
        return ;
    


// 回滚的代码相似,就不写了
  • 执行调用点
@Slf4j
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController 
    private final ShareSeataService shareSeataService;
    @GetMapping("tcc-commit")
    public String tccTransactionCommit() 
        Map<String, String> map = new HashMap<>();
        this.shareSeataService.tccTransactionCommit(map);
        String xid = map.get("xid");
        // 结果T
        return ResultHolder.getActionOneResult(xid);
    
    @GetMapping("/tcc-rollback")
    public String tccTransactionRollback() 
       Map<String, String> map = new HashMap<>();
       try 
          this.shareSeataService.tccTransactionRollback(map);
        catch (Throwable t) 
          log.warn("事务回滚..", t);
       
       String xid = map.get("xid");
       // 结果R
       return ResultHolder.getActionOneResult(xid);
   

定义状态机⽂件:

    "Name": "reduceInventoryAndBalance",
    "Comment": "reduce inventory then reduce balance in a transaction",
    "StartState": "ReduceInventory",
    "Version": "0.0.1",
    "States": 
        "ReduceInventory": 
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceInventory",
            "Next": "ChoiceState",
            "Input": [
                "$.[businessKey]",
                "$.[count]"
            ],
            "Output": 
                "reduceInventoryResult": "$.#root"
            ,
            "Status": 
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exceptionjava.lang.Throwable": "UN"
            
        ,
        "ChoiceState": 
            "Type": "Choice",
            "Choices": [
                
                    "Expression": "[reduceInventoryResult] == true",
                    "Next": "ReduceBalance"
                
            ],
            "Default": "Fail"
        ,
        "ReduceBalance": 
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceBalance",
            "Input": [
                "$.[businessKey]",
                "$.[amount]",
                
                    "throwException": "$.[mockReduceBalanceFail]"分布式事务27
                
            ],
            "Output": 
                "compensateReduceBalanceResult": "$.#root"
            ,
            "Status": 
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exceptionjava.lang.Throwable": "UN"
            ,
            "Catch": [
                
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                
            ],
            "Next": "Succeed"
        ,
        "CompensateReduceInventory": 
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        ,
        "CompensateReduceBalance": 
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        ,
        "CompensationTrigger": 
            "Type": "CompensationTrigger",
            "Next": "Fail"
        ,
        "Succeed": 
            "Type": "Succeed"
        ,
        "Fail": 
            "Type": "Fail",
            "ErrorCode": "PURCHASE_FAILED",
            "Message": "purchase failed"
        
    

测试代码:


 public class LocalSagaTransactionStarter 
        public static void main(String[] args) 
            AbstractApplicationContext applicationContext = new ClassPathXmlApplication
            Context(new String[] "spring/seata-saga.xml");
            StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationCon
            text.getBean("stateMachineEngine");
            transactionCommittedDemo(stateMachineEngine);
            transactionCompensatedDemo(stateMachineEngine);
            new ApplicationKeeper(applicationContext).keep();
        
        private static void transactionCommittedDemo(StateMachineEngine stateMachineEng
ine) 
            Map<String, Object> startParams = new HashMap<>(3);
            String businessKey = String.valueOf(System.currentTimeMillis());
            startParams.put("businessKey", businessKey);
            startParams.put("count", 10);
            startParams.put("amount", new BigDecimal("100"));
//sync test
            StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
                    InventoryAndBalance", null, businessKey, startParams);
                    Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
                            n execute failed. XID: " + inst.getId());
            System.out.

以上是关于分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)的主要内容,如果未能解决你的问题,请参考以下文章

分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(Seata-终)

分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(下)

分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(上)

分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(中)

分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(中)

分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(上)