Seata-从源码了解它

Posted DayFight_DayUp

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Seata-从源码了解它相关的知识,希望对你有一定的参考价值。

Seata-从源码了解它

相关知识介绍

XA协议

X/Open 组织(即现在的 Open Group )定义了分布式事务处理模型。

X/Open DTP 模型包括四个部分:(DTP: Distribution Transaction Process 分布式事务处理)

  • 应用程序( AP )
  • 事务管理器( TM ): 一般,常见的事务管理器( TM )是事务中间件
  • 资源管理器( RM ):一般RM就是数据库。
  • 通信资源管理器( CRM ):常见的CRM是消息中间件。

本地事务

通常把一个数据库内部的事务处理,如对多个表的操作,作为本地事务看待。

全局事务

数据库的事务处理对象是本地事务,而分布式事务处理的对象是全局事务。 所谓全局事务,是指分布式事务处理环境中,多个数据库可能需要共同完成一个工作,这个工作即是一个全局事务。

例:

​ 一个事务中可能更新几个不同的数据库。对数据库的操作发生在系统的各处但必须全部被提交或回滚。

​ 此时一个数据库对自己内部所做操作的提交不仅依赖本身操作是否成功,还要依赖与全局事务相关的其它数据库的操作是否成功,如果任一数据库的任一操作失败,则参与此事务的所有数据库所做的所有操作都必须回滚。

TM的必要性

一般情况下,某一数据库无法知道其它数据库在做什么,因此,在一个 DTP 环境中,事务中间件是必需的,由它通知和协调相关数据库的提交或回滚。

而一个数据库只将其自己所做的操作(可恢复)影射到全局事务中。

2PC

二阶段提交(Two-phaseCommit)是指,在计算机网络以及数据库领域内,为了使基于分布式系统架构下的所有节点在进行事务提交时保持一致性而设计的一种算法(Algorithm)。

通常,二阶段提交也被称为是一种协议(Protocol))。

在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。因此,二阶段提交的算法思路可以概括为:参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。

准备阶段

事务协调者(事务管理器)给每个参与者(资源管理器)发送Prepare消息,每个参与者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的redo和undo日志,但不提交,到达一种“万事俱备,只欠东风”的状态。

可以进一步将准备阶段分为以下三个步骤:

1.协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。

2.参与者节点执行询问发起为止的所有事务操作,并将Undo信息和Redo信息写入日志。(注意:若成功这里其实每个参与者已经执行了事务操作)

3.各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个”同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个”中止”消息。

提交阶段

如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。(注意:必须在最后阶段释放锁资源)

接下来分两种情况分别讨论提交阶段的过程。

当协调者节点从所有参与者节点获得的相应消息都为”同意”时:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EkMONXmj-1632448359321)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906153621152.png)]

1.协调者节点向所有参与者节点发出”正式提交(commit)”的请求。

2.参与者节点正式完成操作,并释放在整个事务期间内占用的资源。

3.参与者节点向协调者节点发送”完成”消息。

4.协调者节点受到所有参与者节点反馈的”完成”消息后,完成事务。

如果任一参与者节点在第一阶段返回的响应消息为”中止”,或者 协调者节点在第一阶段的询问超时之前无法获取所有参与者节点的响应消息时:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zSgRSg3W-1632448359324)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906154546960.png)]

1.协调者节点向所有参与者节点发出”回滚操作(rollback)”的请求。

2.参与者节点利用之前写入的Undo信息执行回滚,并释放在整个事务期间内占用的资源。

3.参与者节点向协调者节点发送”回滚完成”消息。

4.协调者节点受到所有参与者节点反馈的”回滚完成”消息后,取消事务。

不管最后的结果如何,第二阶段都会结束当前事务。

两阶段提交的缺陷

1.同步阻塞问题: 两个阶段分开。

执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。

2.单点故障: HA 高可用
由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)

3.数据不一致:
在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。

4.二阶段无法解决的问题:
协调者再发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。

3PC

三阶段提交(Three-phase commit),也叫三阶段提交协议(Three-phase commit protocol),是二阶段提交(2PC)的改进版本。

与两阶段不同的是,三阶段引入了以下两个改动点:

1.引入超时机制: 同时在协调者和参与者中都引入超时机制。

**2.在第一阶段和第二阶段中插入一个准备阶段:**保证了在最后提交阶段之前各参与节点的状态是一致的。

CanCommit阶段

3PC的CanCommit阶段其实和2PC的准备阶段很像。协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。

1.事务询问 协调者向参与者发送CanCommit请求。询问是否可以执行事务提交操作。然后开始等待参与者的响应。

2.响应反馈 参与者接到CanCommit请求之后,正常情况下,如果其自身认为可以顺利执行事务,则返回Yes响应,并进入预备状态。否则反馈No

PreCommit阶段

协调者根据参与者的反应情况来决定是否可以记性事务的PreCommit操作。根据响应情况,有以下两种可能。

假如协调者从所有的参与者获得的反馈都是Yes响应,那么就会执行事务的预执行。

1.发送预提交请求 协调者向参与者发送PreCommit请求,并进入Prepared阶段。

2.事务预提交 参与者接收到PreCommit请求后,会执行事务操作,并将undo和redo信息记录到事务日志中。

3.响应反馈 如果参与者成功的执行了事务操作,则返回ACK响应,同时开始等待最终指令。

假如有任何一个参与者向协调者发送了No响应,或者等待超时之后,协调者都没有接到参与者的响应,那么就执行事务的中断。

1.发送中断请求 协调者向所有参与者发送abort请求。

2.中断事务 参与者收到来自协调者的abort中止请求之后(或超时之后,仍未收到协调者的请求),执行事务的中断。

doCommit阶段

该阶段进行真正的事务提交,也可以分为以下两种情况。

执行提交

1.发送提交请求 协调接收到参与者发送的ACK响应,那么他将从预提交状态进入到提交状态。并向所有参与者发送doCommit请求。

2.事务提交 参与者接收到doCommit请求之后,执行正式的事务提交。并在完成事务提交之后释放所有事务资源。

3.响应反馈 事务提交完之后,向协调者发送Ack响应。

4.完成事务 协调者接收到所有参与者的ack响应之后,完成事务。

中断事务 协调者没有接收到参与者发送的ACK响应(可能是接受者发送的不是ACK响应,也可能响应超时),那么就会执行中断事务。

1.发送中断请求 协调者向所有参与者发送abort请求

2.事务回滚 参与者接收到abort请求之后,利用其在阶段二记录的undo信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。

3.反馈结果 参与者完成事务回滚之后,向协调者发送ACK消息

4.中断事务 协调者接收到参与者反馈的ACK消息之后,执行事务的中断。

在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者rebort请求时,会在等待超时之后,会继续进行事务的提交。(其实这个应该是基于概率来决定的,当进入第三阶段时,说明参与者在第二阶段已经收到了PreCommit请求,那么协调者产生PreCommit请求的前提条件是他在第二阶段开始之前,收到所有参与者的CanCommit响应都是Yes。(一旦参与者收到了PreCommit,意味他知道大家其实都同意修改了)所以,一句话概括就是,当进入第三阶段时,由于网络超时等原因,虽然参与者没有收到commit或者abort响应,但是他有理由相信:成功提交的几率很大。 )

2PC与3PC的区别

相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit。而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,因为,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。

paxos

Seata

一句话:阿里的高性能分布式解决方案。以高效并且对业务0侵入的方式。解决微服务场景下面临的分布式服务问题。

建议使用支持HA的1.0版本或以上稳定版本。

Seata支持的模式:

AT模式和TCC模式,本次只说AT模式。AT模式侵入少,TCC模式性能高。

Seata AT模式三个角色(AT:Automatic Transaction)

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T6gCRvvE-1632448359327)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906170101352.png)]

整体机制

两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段:
    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

Seata 是什么 工作机制

配置简介

服务端配置启动

1.下载服务端包 地址Releases · seata/seata · GitHub

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zF0MEvrF-1632448359330)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906174325141.png)]

2.建表

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

服务端sql位置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-COdu8eeS-1632448359331)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906174722828.png)]

3.修改配置

cd seata/conf
vim registry.conf


..... 
registry 
  type = "zk"
  zk 
    serverAddr = "xxxx:2181,xxxx:2182,xxxx:2183"
  

config 
  type = "zk"
  zk 
    serverAddr = "xxxx:2181,xxxx:2182,xxxx:2183"
  

4.启动配置

seata-server.sh

客户端配置启动

1.引入包

<dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.4.2</version>
    </dependency>

2.配置文件

#seata
seata.tx-service-group=my_test_tx_group
seata.config.type=zk
seata.config.zk.server-addr=xxxx:2181,xxxx:2182,xxxx:2183
seata.registry.type=zk
seata.registry.zk.server-addr=xxxx:2181,xxxx:2182,xxxx:2183

3.代码

首先在Application类上加注解@EnableTransactionManagement
其次在需要分布式事务的方法上加注解@GlobalTransactional
最后,run起来(那么肯定会报错的 )

4.数据库

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

rollback_info 保存的信息


  "branchId": 2008522332,
  "sqlUndoLogs": [
    
      "afterImage": 
        "rows": [
          
            "fields": [
              
                "keyType": "PrimaryKey",
                "name": "id",
                "type": 4,
                "value": 3
              ,
              
                "keyType": "NULL",
                "name": "count",
                "type": 4,
                "value": 98
              
            ]
          
        ],
        "tableName": "storage_tbl"
      ,
      "beforeImage": 
        "rows": [
          
            "fields": [
              
                "keyType": "PrimaryKey",
                "name": "id",
                "type": 4,
                "value": 3
              ,
              
                "keyType": "NULL",
                "name": "count",
                "type": 4,
                "value": 100
              
            ]
          
        ],
        "tableName": "storage_tbl"
      ,
      "sqlType": "UPDATE",
      "tableName": "storage_tbl"
    
  ],
  "xid": "192.168.202.197:8091:2008522331"

sql位置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jpBCOax1-1632448359332)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906175139978.png)]

源码分析

第一阶段:

核心在于对业务sql进行解析,转换成undolog,并同时入库,这是怎么做的呢?先抛出一个概念DataSourceProxy代理数据源,通过名字大家大概也能基本猜到是什么个操作,后面做具体分析。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-usYo50dC-1632448359333)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906171210696.png)]

insert delete

delete insert

update update

第二阶段

分布式事务操作成功,则TC通知RM异步删除undolog

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RXcYDNlE-1632448359333)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906171248863.png)]

分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BHcvgofh-1632448359334)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906171316815.png)]

源码

1.每一个TM客户端都会代理一下原来datasource,整合自动装配这个代理默认的值就是true

  #enable-auto-data-source-proxy: true

2.自动装配还会配置一个名字为GlobalTransactionScanner的bean

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RwpdT7l3-1632448359335)(C:\\Users\\Administrator\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210906175954207.png)]

可以看到分别实现了Spring的3个接口InitializingBeanApplicationContextAwareDisposableBean

看看实现InitializingBean的方法

@Override
public void afterPropertiesSet() 
    // 根据配置, 查看是否支持全局事务
    if (disableGlobalTransaction) 
        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Global transaction is disabled.");
        
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                             (ConfigurationChangeListener)this);
        return;
    
    if (initialized.compareAndSet(false, true)) 
        // 初始化客户端
        initClient();
    

调用了初始化客户端initClient()

initClient()

private void initClient() 
    if (LOGGER.isInfoEnabled()) 
        LOGGER.info("Initializing Global Transaction Clients ... ");
    
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) 
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    
    //init TM 初始化事务管理器
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) 
        LOGGER.info("Transaction Manager Client is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
    
    //init RM 初始化资源管理器
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) 
        LOGGER.info("Resource Manager is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
    

    if (LOGGER.isInfoEnabled()) 
        LOGGER.info("Global Transaction Clients are initialized. ");
    
    registerSpringShutdownHook();

里面对TmClient,RmClient进行了初始化(参数就是配置文件bean里配置的applicationId和txServiceGroup),并注册了一个Spring的ShutdownHook

TmClient.init()

io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

这里只是初始化了一个nettyTM客户端

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) 
    TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
    tmNettyRemotingClient.init();

tm客户端初始化

io.seata.core.rpc.netty.TmNettyRemotingClient#init

@Override
public void init() 
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) 
        super.init();
    

.....

    // 1.registry TC response processor 注册里一系列的响应处理器
    ClientOnResponseProcessor onResponseProcessor =
    new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType以上是关于Seata-从源码了解它的主要内容,如果未能解决你的问题,请参考以下文章

Seata-从源码了解它

Seata-从源码了解它

Seata RPC 模块的重构之路

Seata 源码分析 - tmrm 中 xid 传递过程

参与 Seata 社区到 go 与 Seata 的邂逅

Seata服务搭建 —— nacos