分布式事务的一种实现方式:状态流转

Posted 数据分析与开发

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务的一种实现方式:状态流转相关的知识,希望对你有一定的参考价值。


来源:lianyu 自荐

blog.csdn.net/qq_23589553/article/details/78468154


关于分布式事务,参考了网上提到的一些办法,比如利用消息队列实现分布式事务,补偿事务,TCC,最大努力送达,等等。这里给出自己的一些理解和实现。可以称之为状态流转的实现。


一些要点


  • 大事务拆分成多个小事务,每个小事务都是单机上的事务

  • 要支持幂等,即每个小事务多次执行时结果要相同。

    • 如何达到幂等,一般是加一个状态,如一个带有状态字段的行,在写入业务数据时同时修改状态为“已做”,“已撤销”之类的。下次再做时,先检查状态的值,如果存在且为“已做”,则不用再做了;如果不存在,说明是“未做”,这时需要写入业务数据。

  • 对应一个大事务的多个小事务如何保证全部成功或者全部失败。

    • 先分析如何保证全部成功,就是多做几次,直到全部成功。由于有幂等的保证,过程中某些小事务多做了几次,是不影响数据的正确性的。

    • 同理可以分析全部失败即需要回滚的情况,也是在幂等保证的前提下多做几次,直到全部回滚完成。

    • 如何保证多做几次,办法应该很多,一种方法是在第一个小事务执行前先发一个延时消息,这个消息中包含预先生成的对应整个大事务流程的id。这样就算当前事务流程中在某个步骤失败,以后也有再次检查执行的机会。

    • 其他的一些技巧。使用唯一索引保证操作的唯一性以及防并发操作。使用CAS(compare and set)来防并发修改操作,如专设一个rowver字段。


分析一个简单的例子,转账业务。


  • 相关的表如下

    • 注意AccountChangeState有2份记录,只是idForPart不同(注意其他相同,如acsId等相同),分别为fromUserId和toUserId。这样与Account是一一对应的,并且分区方式相同,这样方便事务处理。

    • Account(userId,amount,rowver)

    • AccountTransferState(acsId,idForPart,state,fromUserId,toUserId,amount,rowver)


先分析流程中的正常情况。


  • 0,预先生成acsId,并发送消息到队列,消息内容包括AccountTransferState的几乎所有字段 以及 msgType=checkTransfer(在别的情况下还可以msgType=newTransfer,相当于异步进行转账操作)等。

    • 利用消息队列是自动处理,还可以人工干预。比如,用户(这里指fromUser)可以查询一个AccountTransferState的列表,按时间从晚到早排序。如果哪条记录未完成,可以点按钮触发操作。

    • 综上2种情况,在预先生成acsId后,都有办法拿到这个acsId以及其他相关数据(如fromUserId等等)。

  • 1,在单机事务中,先检查fromAccount的amount,

    • 如果不够,则直接返回,整个大事务以fail结束。

    • 如果够,减少fromAccount 并 新建AccountTransferState(acsId为预先生成,idForPart=fromUserId,state=didOut)。

  • 2,在单机事务中 增加toAccount 并 新建AccountChangeState(idForPart=toUserId,state=finish)

  • 3,在单机事务中 修改idForPart=fromUserId的AccountTransferState,使state=finish。


再分析流程中的异常情况。


在1,2,3中每一步都可能出异常。(第0步出异常,处理很简单,参考下面的1B步骤即可)


由于第0步存在,总有办法拿到具体的id和必要数据。从而可以分析检查重做的过程,如下。


  • 1B,在单机事务中,使用acsId和idForPart=fromUserId查询AccountTransferState记录,

    • 如果state=finish,则说明是在第3步事务完成后出了异常,此时整个流程已经成功完成,不用在这里做什么。

    • 如果state=didOut,说明第1步已经完成,由下一步来处理,见2B.

    • 如果state=其他,按业务逻辑分析不应该出现,给开发人员报bug。

    • 如果不存在,说明第1步的事务由于各种原因失败,从而是整个流程失败,不用管了。(另外也可以考虑再次转账,不过在这里暂不分析,以避免引入一些不必要的复杂性)

    • 如果存在,看AccountTransferState记录内容,

  • 2B, 在单机事务中,使用acsId和idForPart=toUserId查询AccountTransferState记录,

    • 如果state=finish,说明第2步已经成功完成,只需简单做前面的第3步中的内容

    • 如果state=其他,按业务逻辑分析不应该出现,给开发人员报bug。

    • 如果不存在,说明第2步尚未开始或未能成功开始,只需简单做前面的2、3步中的内容

    • 如果存在,看AccountTransferState记录内容,

  • 3B,同前面的第3步

    • 这步比较简单。根据上面的分析,如果能够到这步,只有当前面2步都成功时才到。所以无需判断什么条件,只需简单执行第3步的内容即可。


分析一个比较复杂的例子,使用积分和余额付款,积分和余额在不同的库中。


由于在不同的库中,还可以把一个小事务抽象成一个远程调用。


相关的表如下  


这里AccountTransferRequest4Account一般是5片。  


AccountTransferRequest(atrId,fromUserId,toUserId,state,crtTime,detailJson,shuoming,rowver)//detailJson 包括 多条记录,每条记录有 AccountType, amount  

AccountYuE(userId,amount,rowver)  

AccountJiFen(userId,amount,rowver)  

AccountYuEChangeState  (userIdForPart,atrId,state,rowver, amount)  

AccountJiFenChangeState(userIdForPart,atrId,state,rowver, amount)  

    

注意AccountXyzChangeState2份记录,分别与2AccountXyz对应,且分区方式相同,这样保证单机事务

 

初次执行所有事务时(第一次也称创建阶段)


1,建立转账请求 AccountTransferRequest(state=Init)

在事务前预先生成atrId,(还考虑先保存到导致这个动作的根动作上,这样便于追查状态)。


第一次执行暂且不用分布式锁,因为atrId是新生成的,没有别的地方争用。除非是锁根动作的id。


发送延时消息到队列(1minute,msgType=checkTransfer)。消息内容包括AccountTransferRequest的几乎所有字段,重点的有atrId,fromUserId,msgType=checkTransfer(当异步进行转账操作,还可以msgType=newTransfer)。


另一方面,用户(这里是fromUser),可以by fromUserId查询一个AccountTransferRequest的列表,按时间从晚到早排序。如果哪条记录未完成,可以点按钮触发操作。(或者从某个根动作得到atrId,并进一步取到数据)
综上2种情况,都可以拿到atrId和fromUserId。


另外,toUser暂时认为没有查询AccountTransferRequest的列表的必要。


  • 一方面,没转账成功,toUser不必知道这个请求状态的中间过程(毕竟现在的电商网站也没有提供这种查询)。

  • 另一方面,还可以查AccountChangeState作为列表,至于join问题,可以冗余几个字段或一个json字符串字段,或者等这个数据流转到异构数据库来解决查询问题。


2,处理 AccountJiFen 的转出部分逻辑

先检查fromAccountJiFen的amount够不够:


够 ,则 减少fromAccountJiFen 并 新建AccountJiFenChangeState(fromUserId,atrId,state=didOut,atrDetailJson~)


不够,则 不写fromAccountJiFen和AccountJiFenChangeState的数据。


  • 此时需要回滚,先发送延时消息到队列,消息内容包括AccountTransferRequest的几乎所有字段,重点的有atrId,msgType=toRollback。

  • 如果或就算在发送消息前出异常停掉。下次通过消息进入检查处理流程,会重试步骤2,可以走够与不够2个分支,但不会导致数据不一致等错误。

  • 然后修改AccountTransferRequest的state=toRollback,并修改shuoming字段为失败原因,这里是积分不足。然后返回失败原因提示用户或者发消息通知用户。

  • 然后执行B处的回滚逻辑,而不是执行下面的逻辑。考虑到通用性,B处的回滚逻辑包括多处数据的处理。

  • 注意这里由于有分支路径,需要避免并发。因为不排除在某些情况下,相同的两个消息由2个线程并发处理,而且2个线程走的分支不一样,比如先执行的发现amount不够需要回滚,而后执行的发现amount够了(比如之前amount被另一个线程增加了)而继续执行正常分支。这需要一把分布式锁作用在大事务上来避免并发。


3,处理 AccountYuE 的转出部分逻辑

先检查fromAccountYuE的amount够不够:


够 ,则 减少fromAccountYuE 并 新建AccountYuEChangeState(fromUserId,atrId,state=didOut,atrDetailJson~)


不够,则 不写fromAccountYuE和AccountYuEChangeState的数据。


  • 而是修改AccountTransferRequest的state=toRollback,并修改shuoming字段为失败原因,这里是余额不足。然后返回失败原因提示用户或者发消息通知用户。

  • 然后执行B处的回滚逻辑,而不是执行下面的逻辑。考虑到通用性,B处的回滚逻辑包括多处数据的处理。

  • 注意这里由于有分支路径,需要避免并发。


4,处理 AccountJiFen 的转入部分逻辑


在事务中 增加toAccountJiFen 并 新建AccountJiFenChangeState(toUserId,atrId,state=didIn,atrDetailJsonStr~)


此处不会出现业务逻辑问题而需要回滚,可以设想对应系统停机维护,此时出网络错不应该回滚。所以只需一直重试直到成功即可,当然重试机制需要细化考虑。


5,处理 AccountYuE 的转入部分逻辑


在事务中 增加toAccountYuE 并 新建AccountYuEChangeState(toUserId,atrId,state=didIn,atrDetailJsonStr~)


此处不会出现业务逻辑问题而需要回滚,可以设想对应系统停机维护,此时出网络错不应该回滚。所以只需一直重试直到成功即可,当然重试机制需要细化考虑。


6,修改转账请求AccountTransferRequest的 state=finish


7,有无必要修改4个AccountChangeState的state=finish,待定,暂不考虑


—-


B 回滚逻辑


B0 先加一个分布式锁,利用Redisson,以atrId为key。这样可以防止意外的并发执行。


B1 先发送延时消息到队列
,消息内容包括AccountTransferRequest的几乎所有字段,重点的有atrId,msgType=toRollback。


以保证下面步骤出异常时,还能继续做回滚操作直到完成。


  • 如果AccountTransferRequest的state!=toRollback,而是state=Init,则修改AccountTransferRequest的state=toRollback。

  • 如果state=finish OR fail ,本来是不做任何事情,但是为何能有这条路线是很奇怪的,需要让开发调查是否存在bug。


B2 from方的 AccountJiFen的回滚

根据 atrId 和 fromUserId 查询 AccountJiFenChangeState 记录


如果没查到记录,说明没做转出动作,不用回滚,即不做任何操作。


如果查到1条记录,(不可能多条记录,否则给开发报bug)


  • 如果state=didOut,则需要回滚。增加fromAccountJiFen 并 修改AccountJiFenChangeState的state=didRollback 。

  • 如果state=didRollback,说明已经回滚了,不用再做任何操作了。


B3 from方的 AccountYuE的回滚

根据 atrId 和 fromUserId 查询 AccountYuEChangeState 记录


如果没查到记录,说明没做转出动作,不用回滚,即不做任何操作。


如果查到1条记录,(不可能多条记录,否则给开发报bug)


  • 如果state=didOut,则需要回滚。增加fromAccountYuE 并 修改AccountYuEChangeState的state=didRollback 。

  • 如果state=didRollback,说明已经回滚了,不用再做任何操作了。


B4 to方的 AccountJiFen的回滚

根据 atrId 和 toUserId 查询 AccountJiFenChangeState 记录


如果没查到记录,说明没做转入动作,不用回滚,即不做任何操作。


如果查到1条记录,(不可能多条记录,否则给开发报bug)


  • 如果state=didIn,则需要回滚。减少toAccountJiFen 并 修改AccountJiFenChangeState的state=didRollback 。

  • 如果state=didRollback,说明已经回滚了,不用再做任何操作了。


B5 to方的 AccountYuE的回滚

根据 atrId 和 toUserId 查询 AccountYuEChangeState 记录


如果没查到记录,说明没做转入动作,不用回滚,即不做任何操作。


如果查到1条记录,(不可能多条记录,否则给开发报bug)


  • 如果state=didIn,则需要回滚。减少toAccountYuE 并 修改AccountYuEChangeState的state=didRollback 。

  • 如果state=didRollback,说明已经回滚了,不用再做任何操作了。


B6 最后修改AccountTransferRequest的state=fail


B7 释放分布式锁

—-


由于在1–6中每一步都可能失败或异常,研究处理失败情况。


由于可以拿到具体的id,所以先分2种情况,一种是完全新增,此时对应上面的步骤,简单。


一种是某个步骤失败后,通过某种方式重启这个转账流程,在下面讨论。


0C,先加一个分布式锁,利用Redisson,以atrId为key。这样可以防止意外的并发执行。至于为何要避免并发在前面有地方分析过。


1C,使用atrId查询AccountTransferRequest记录

如果不存在,说明第1步就没成功,可以以全新方式重做上面的1–6步。暂不考虑已经返回错误信息给用户,由用户决定,而不用做任何动作的情况。


如果存在,看AccountTransferRequest记录内容:


  • 如果state=finish,此时整个流程已经成功完成,不用做什么,只需消费掉这条消息即可。

  • 如果state=fail,整个流程已经失败,也不用管了。

  • 如果state=toRollback,转入 B 回滚逻辑 的处理流程

  • 如果state=Init,说明至少第1步已经完成,由第2步来检查处理,见2C.

  • 如果state=其他,按业务逻辑分析不应该出现,给开发人员报错提醒是bug。


2C, 使用atrId和idForPart=fromUserId查询AccountJiFenChangeState记录

如果不存在,说明第2步尚未开始或未能成功开始,简单做2–6步。这里有可能之前是第2步检查失败,但是未能修改AccountTransferRequest记录状态。注意可能出现以前检查失败,现在检查可以成功的情况。虽然单独做没有问题,但是并发做则有隐患,需要防止并发。


如果存在,看AccountJiFenChangeState记录内容:


  • 如果state=didOut,说明第2步已经成功完成,由第3步来检查处理,见3C.

  • 如果state=didRollback,说明应该回滚了,转入 B 回滚逻辑 的处理流程

  • 如果state=其他,按业务逻辑分析不应该出现,给开发人员报错提醒是bug。


3C, 使用atrId和idForPart=fromUserId查询AccountYuEChangeState记录

如果不存在,说明第3步尚未开始或未能成功开始,简单做3–6步。这里有可能之前是第3步检查失败,但是未能修改AccountTransferRequest记录状态。注意可能出现以前检查失败,现在检查可以成功的情况。虽然单独做没有问题,但是并发做则有隐患,需要防止并发。


如果存在,看AccountYuEChangeState记录内容:


  • 如果state=didOut,说明第3步已经成功完成,由第4步来检查处理,见4C.

  • 如果state=didRollback,说明应该回滚了,转入 B 回滚逻辑 的处理流程

  • 如果state=其他,按业务逻辑分析不应该出现,给开发人员报错提醒是bug。


4C, 使用atrId和idForPart=toUserId查询AccountJiFenChangeState记录

如果不存在,说明第4步尚未开始或未能成功开始,简单做4–6步


如果存在,看AccountJiFenChangeState记录内容:


  • 如果state=didOut,说明第4步已经成功完成,由第5步来检查处理,见5C.

  • 如果state=didRollback,说明应该回滚了,转入 B 回滚逻辑 的处理流程

  • 如果state=其他,按业务逻辑分析不应该出现,给开发人员报错提醒是bug。


5C, 使用atrId和idForPart=toUserId查询AccountYuEChangeState记录

如果不存在,说明第5步尚未开始或未能成功开始,简单做5–6步


如果存在,看AccountYuEChangeState记录内容:


  • 如果state=didOut,说明第5步已经成功完成,由第6步来检查处理,见6C.

  • 如果state=didRollback,说明应该回滚了,转入 B 回滚逻辑 的处理流程

  • 如果state=其他,按业务逻辑分析不应该出现,给开发人员报错提醒是bug。


6C,同第6步

7C,释放分布式锁


小结


这种状态流转的分布式事务的实现的好处,我认为最大的好处就是直观。


所有的数据处理,都可以合在一个大的函数里面,可以看到整个处理过程的全貌。


这里也给出本人对前面的2个例子的具体代码实现,在 https://github.com/zlywq/tmycat1 。



看完本文有收获?请转发分享给更多人

关注「数据库开发」,提升 DB 技能

以上是关于分布式事务的一种实现方式:状态流转的主要内容,如果未能解决你的问题,请参考以下文章

MassTransit | 基于StateMachine实现Saga编排式分布式事务

一文看懂分布式事务

一文看懂分布式事务

一阶段提交分布式事务管理方式

再问你一遍,你真的了解分布式事务吗?

etcd分布式锁及事务