seata-golang如何实现AT模式一致性

Posted 新华三云计算

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了seata-golang如何实现AT模式一致性相关的知识,希望对你有一定的参考价值。

作者 | 刘晓敏  


简介


2017 年的时候,当时的公司技术上开始向微服务架构转型。那时,提到微服务大家一头雾水,经过两年的实践,逐渐有了一些心得。但有个问题始终萦绕在微服务开发者的头上,分布式事务到底如何解决,有没有比较完美的方案?二阶段提交、柔性事务、最终一致性?


2019 年 2 月,阿里在社区开源他们的分布式事务解决方案,我很快关注到这个项目,起初还叫 fescar,后来更名 seata。seata 框架在众多社区小伙伴的努力下,迅速应用到了生产环境。随着对 seata 框架的了解,当看到社区里面也有关于 seata go client 的呼声,遂萌生了做一个 go 版本的分布式事务框架的想法。


我们知道 Seata Java Client 的 AT 模式,通过代理数据源,实现了对业务代码无侵入的分布式事务协调机制,将与 Transaction Coordinator (TC) 交互的逻辑、Commit 的逻辑、Rollback 的逻辑,隐藏在切面和代理数据源相应的代码中,使开发者无感知。那如果这个方法,要用 Golang 来实现一遍,应该如何操作呢?



seata-golang如何实现AT模式一致性

AT模式的实现


关于这个问题,我想了很久,最初的设想是,对database/sql mysql driver 进行增强,在对包github.com/go-sql-driver/mysql研究了一段时间后,还是没有头绪,不知如何下手,最后转而增强database/sql包。由于 AT 模式必须保证本地事务的正确处理,在具体业务开发时,首先要通过db.Begin()获得一个 Tx 对象,然后再tx.Exec()执行数据库操作,最后tx.Commit()提交或tx.Rollback()回滚。这种处理方式算是一个 Golang 数据库事务处理的基本操作。所以对database/sql的增强,我们重点关注这几个方法db.Begin()tx.Exec()tx.Commit()tx.Rollback

seata-golang如何实现AT模式一致性

1. 事务提交、回滚

通过 Seata Java Client 的相关代码,我们知道,在本地事务提交的时候,主要是将分支事务注册到 TC 上,并将数据库操作产生的 undoLog 一起写入到 undoLog 表;本地事务回滚的时候,需要将分支事务(即本地事务)的执行状态报告给 TC,使 TC 好知道是否通知参与全局事务的其他分支回滚。

func (tx *Tx) Commit() error { //注册分支事务 branchId,err := tx.register() if err != nil { return errors.WithStack(err) } tx.tx.Context.BranchId = branchId
if tx.tx.Context.HasUndoLog() { //将 undoLog 写入 undoLog 表 err = manager.GetUndoLogManager().FlushUndoLogs(tx.tx) if err != nil { err1 := tx.report(false) if err1 != nil { return errors.WithStack(err1) } return errors.WithStack(err) } // 提交本地事务 err = tx.tx.Commit() if err != nil { err1 := tx.report(false) if err1 != nil { return errors.WithStack(err1) } return errors.WithStack(err) } } else { return tx.tx.Commit() } if tx.reportSuccessEnable { tx.report(true) } tx.tx.Context.Reset() return nil}

db.Begin() 会产生一个 Tx 对象,tx.Exec() 执行数据库操作会产生 undoLog,tx.Commit() 将 undoLog 刷到数据库中。那么 undoLog 保存到哪里呢?答案是 TxContext 中。

type TxContext struct { *context.RootContext Xid string BranchId int64 IsGlobalLockRequire bool
LockKeysBuffer *model.Set SqlUndoItemsBuffer []*undo.SqlUndoLog}
Commit() 方法中的 tx.tx.Context,第一个 tx 是封装的 Tx 对象,第二个 tx 是 database/sql 的 Tx,tx.tx.Context 则是 TxContext。UndoLogManager 则是操作 undoLog 的核心对象,处理 undoLog 的插入、删除,并查询出 undoLog 用于回滚。
func (tx *Tx) Rollback() error { err := tx.proxyTx.Rollback() if tx.proxyTx.Context.InGlobalTransaction() { branchId, err := tx.register() if err != nil { return errors.WithStack(err) } tx.proxyTx.Context.BranchId = branchId tx.report(false) } tx.proxyTx.Context.Reset() return err}
通过上面的代码呢,我们知道增强型 Tx 对象需要向 TC 注册分支事务,并报告分支事务的执行状态,相应代码如下:
func (tx *Tx) register() (int64,error) { return dataSourceManager.BranchRegister(meta.BranchTypeAT,tx.tx.ResourceId,"",tx.tx.Context.Xid, nil,tx.tx.Context.BuildLockKeys())}
func (tx *Tx) report(commitDone bool) error { retry := tx.reportRetryCount for retry > 0 { var err error if commitDone { err = dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId, meta.BranchStatusPhaseoneDone,nil) } else { err = dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId, meta.BranchStatusPhaseoneFailed,nil) } if err != nil { logging.Logger.Errorf("Failed to report [%d/%s] commit done [%t] Retry Countdown: %d", tx.tx.Context.BranchId,tx.tx.Context.Xid,commitDone,retry) retry = retry -1 if retry == 0 { return errors.WithMessagef(err,"Failed to report branch status %t",commitDone) } } } return nil}
和 TC 进行通信的主要逻辑还是在 DataSourceManager 里面。AT 模式涉及的 两个关键对象 DataSourceManager、UndoLogManager 就浮出水面。一个用于远 程 TC 交互,一个用于本地数据库处理。

2. 事务执行

func (tx *Tx) Exec(query string, args ...interface{}) (sql.Result, error) { var parser = p.New() // 解析业务 sql act,_ := parser.ParseOneStmt(query,"","") deleteStmt,isDelete := act.(*ast.DeleteStmt) if isDelete { executor := &DeleteExecutor{ tx: tx.tx, sqlRecognizer: mysql.NewMysqlDeleteRecognizer(query,deleteStmt), values: args, } return executor.Execute() }
insertStmt,isInsert := act.(*ast.InsertStmt) if isInsert { executor := &InsertExecutor{ tx: tx.tx, sqlRecognizer: mysql.NewMysqlInsertRecognizer(query,insertStmt), values: args, } return executor.Execute() }
updateStmt,isUpdate := act.(*ast.UpdateStmt) if isUpdate { executor := &UpdateExecutor{ tx: tx.tx, sqlRecognizer: mysql.NewMysqlUpdateRecognizer(query,updateStmt), values: args, } return executor.Execute() }
return tx.tx.Tx.Exec(query,args)}

执行业务 sql,并生成 undoLog 的关键,在于识别业务 sql 执行了什么操作:插入?删除?修改?这里使用 tidb 的 sql parser 去解析业务 sql,再使用相应的执行器去执行业务 sql,生成 undoLog 保存在 Tx_Context 中。


3. 事务开启

db.Begin() 返回增强型的 Tx 对象。

func (db *DB) Begin(ctx *context.RootContext) (*Tx,error) { tx,err := db.DB.Begin() if err != nil { return nil,err } proxyTx := &tx2.ProxyTx{ Tx: tx, DSN: db.conf.DSN, ResourceId: db.GetResourceId(), Context: tx2.NewTxContext(ctx), } return &Tx{ tx: proxyTx, reportRetryCount: db.conf.ReportRetryCount, reportSuccessEnable: db.conf.ReportSuccessEnable, },nil}

至此,我们就介绍完了 AT 模式的基本实现原理。


AT模式的实现


seata-golang 从今年 4 月份开始开发,到 8 月份基本实现和 java 版 seata 1.2 协议的互通,对 mysql 数据库实现了 AT 模式(自动协调分布式事务的提交回滚),实现了 TCC 模式,TC 端使用 mysql 存储数据,使 TC 变成一个无状态应用支持高可用部署。


后续,还有许多工作可以做,比如:对注册中心的支持、对配置中心的支持、和 java 版 seata 1.4 的协议互通、其他数据库的支持、raft transaction coordinator 的实现等,希望对分布式事务问题感兴趣的开发者可以加入进来一起来打造一个完善的 golang 的分布式事务框架。


如果你有任何疑问,欢迎钉钉扫码加入交流群【钉钉群号 33069364】:


作者简介

刘晓敏 (GitHubID dk-lockdown),目前就职于新华三集团 云与智能产品线,从事云原生方面的工作。


参考资料

  • seata 官方:

    https://seata.io

  • java 版 seata

    https://github.com/seata/seata

  • https://github.com/opentrx/seata-golang

  • seata-golang go 夜读 b站分享

    https://www.bilibili.com/video/BV1oz411e72T


以上是关于seata-golang如何实现AT模式一致性的主要内容,如果未能解决你的问题,请参考以下文章

Seata-AT 如何保证分布式事务一致性

分布式事务框架 seata-golang 通信模型详解

Seata的四种模式介绍

全面分布式一致性解决方案

框架篇:分布式一致性解决方案

框架篇:分布式一致性解决方案