谈一谈如何使用etcd中的事务以及自己的理解

Posted huageyiyangdewo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了谈一谈如何使用etcd中的事务以及自己的理解相关的知识,希望对你有一定的参考价值。

01、谈一谈etcd事务的如何使用以及自己的理解

本文内容来源于自己学习时所做的记录,主要来源于文章最后的参考链接,如有侵权,请联系删除,谢谢!

etcd 是一个 key/value 类型的数据库。既然我们需要存储数据,必然会面临这样一个需求,即希望无论什么样的场景下,一组操作要么同时完成,要么都失败,哪怕数据库出现了故障,甚至了机器发生了宕机。幸运的是 从 etcd 3.4 版本开始,开始支持事务。

etcd 中的事务,实现了原子地执行冲突检查、更新多个 keys 的值。除此之外,etcd 将底层 MVCC 机制的版本信息暴露出来,基于版本信息封装出了一套基于乐观锁的事务框架 STM,并实现了不同的隔离级别。接下来我们就一起学习学习。

1、什么是事务?

事务(Transaction)是指数据库管理系统中的一个基本操作单位,通常由一组对数据库的读/写操作组成一个不可分割的操作序列。

在关系型数据库中,事务应该具备 ACID 特性,即原子性、一致性、隔离性、持久性。具体解释如下:

  • 原子性(Atomicity):事务作为一个不可分割的操作序列,要么全部执行成功,要么全部回滚。如果事务在执行过程中发生错误,需要将已经修改的数据撤销到事务开始前的状态。
  • 一致性(Consistency):指事务执行前和执行后的数据状态保持一致。在事务执行过程中,数据库会处于一种中间状态,需要通过在事务结束时进行一些额外的工作来确保数据的一致性。这里理解起来还是有点抽象,我们借助一个例子来说明:
比如,用户 A 和用户 B 在银行分别有 1000 元和 500 元,总共 1500 元,用户 A 给用户 B 转账 200 元,分为两个步骤,从 A 的账户扣除 200 元和对 B 的账户增加 200 元。一致性就是要求上述步骤操作后,最后的结果是用户 A 还有 800 元,用户 B 有 700 元,总共 1500 元,而不会出现用户 A 扣除了 200 元,但用户 B 未增加的情况(该情况,用户 A 为 800 元,用户 B 为500 元,总共 1300 元)。
  • 隔离性(Isolation):多个事务并发执行时,每个事务是彼此隔离的,互相不干扰。隔离级别越高,就越能防止出现脏读、不可重复读、幻读等问题。
  • 持久性(Durability):指一旦事务提交,则所有修改的数据都将永久保存在数据库中。即使发生系统故障,也不会丢失任何数据。

常见的关系型数据库如 MySQL,在存储引擎为 InnoDB时是支持事务的,那InnoDB 引擎通过什么技术来保证事务的这四个特性的呢?

  • 持久性是通过 redo log (重做日志)来保证的;
  • 原子性是通过 undo log(回滚日志) 来保证的;
  • 隔离性是通过 MVCC(多版本并发控制) 或锁机制来保证的;
  • 一致性则是通过持久性+原子性+隔离性来保证;

了解了 MySQL 中的事务的简单实现方式后,接下来讲一讲 etcd 中的事务。

etcd 中的事务则是基于 CAS(Compare and Swap,即比较再交换) 方式

etcd 使用了不到四百行的代码实现了迷你事务,其对应的语法为 If-Then-Else。etcd 允许用户在一次修改中批量执行多个操作,即这一组操作被绑定成一个原子操作,并共享同一个修订号。写法类似 CAS,如下所示:

Txn().If(cond1, cond2, ...).Then(op1, op2, ...,).Else(op1, op2)

上面语句的意思是,如果 If 冲突判断语句为真,对应返回值为 true,Then 中的语句将会被执行,否则执行 else 中的逻辑。

在 etcd 事务执行过程中,客户端与 etcd 服务端之间没有维护事务会话。冲突判断(If)和执行过程 Then/Else作为一个原子过程来执行 If-Then-Else,因此 etcd 事务不会发生阻塞,无论成功还是失败都会返回,当发生冲突导致执行失败时,需要应用进行重试。业务代码需要考虑这部分的重试逻辑。

讲了这么多理论的东西,接下来就一起看一看 etcd 中事务的使用案例。

2、etcd中事务的使用案例

这里使用一个比较容易理解的例子来进行演示,用户A 向 用户B 转账的场景。etcd 的事务基于乐观锁思想,当检测到冲突会发起重试,检测冲突时使用了ModRevision进行校验,该字段表示某个 key 上一次被更改时,全局的版本是多少。因此,我们实现转账业务的流程如下所示:

代码如下:

package main

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"strconv"
	"time"
)

func main() 
	config := clientv3.Config
		Endpoints:   []string"192.168.91.66:12379",
		DialTimeout: 5 * time.Second,
	
	// 建立连接
	client, err := clientv3.New(config)
	if err != nil 
		fmt.Println(err)
		return
	

	sender, receiver := "/sender_amount", "/receiver_amount"
	_, err = client.Put(context.Background(), sender, "1000")
	if err != nil 
		fmt.Printf("etcd put /sender_amount failed, err:%v\\n", err)
		return
	
	_, err = client.Put(context.Background(), receiver, "500")
	if err != nil 
		fmt.Printf("etcd put /receiver_amount failed, err:%v\\n", err)
		return
	

	err = txnTransfer(client, sender, receiver, 200)
	if err != nil 
		fmt.Printf("etcd txnTransfer failed, err:%v\\n", err)
		return
	


func txnTransfer(etcd *clientv3.Client, sender, receiver string, amount uint64) error 
	// 失败重试
	for 
		if ok, err := doTxn(etcd, sender, receiver, amount); err != nil 
			return err
		 else if ok 
			return nil
		
	


func doTxn(etcd *clientv3.Client, sender, receiver string, amount uint64) (bool, error) 
	// 第一个事务,利用事务的原子性,同时获取发送和接收者的余额以及 ModRevision
	getResp, err := etcd.Txn(context.TODO()).Then(clientv3.OpGet(sender), clientv3.OpGet(receiver)).Commit()
	if err != nil 
		return false, err
	
	senderKV := getResp.Responses[0].GetResponseRange().Kvs[0]
	receiverKV := getResp.Responses[1].GetResponseRange().Kvs[0]
	senderNum, receiverNum := toUInt64(senderKV.Value), toUInt64(receiverKV.Value)
	// 验证账户余额是否充足
	if senderNum < amount 
		return false, fmt.Errorf("资金不足")
	
	// 发起转账事务,冲突判断 ModRevision 是否发生变化
	txn := etcd.Txn(context.TODO()).If(
		clientv3.Compare(clientv3.ModRevision(sender), "=", senderKV.ModRevision),
		clientv3.Compare(clientv3.ModRevision(receiver), "=", receiverKV.ModRevision))
	// ModRevision 未发生变化,即 If 判断条件成功
	txn = txn.Then(
		clientv3.OpPut(sender, fromUint64(senderNum-amount)),     // 更新发送者账户余额
		clientv3.OpPut(receiver, fromUint64(receiverNum+amount))) // 更新接收者账户余额
	resp, err := txn.Commit() // 提交事务
	if err != nil 
		return false, err
	
	return resp.Succeeded, nil


func toUInt64(value []byte) uint64 
	ret, err := strconv.Atoi(string(value))
	if err != nil 
		fmt.Printf("value to uint64 failed, err: %v\\n", err)
		return 0
	
	return uint64(ret)


func fromUint64(number uint64) string 
	return strconv.Itoa(int(number))


转账业务开始前,sender、reciver的余额如下:

转账业务完成后,sender、reciver的余额如下:

如上 etcd 事务的实现基于乐观锁思想,涉及到两次事务操作,第一次事务利用原子性来同时获取发送方和接收方的当前账户金额;第二次事务发起转账操作,冲突检测 ModRevision 是否发生变化,如果没有变化则正常提交事务。若发生了冲突,则需要进行重试。

如上过程的实现较为繁琐,除了业务逻辑,还有大量的代码用来判断冲突以及重试。因此,etcd 社区基于事务特性,实现了一个简单的事务框架 STM,构建了各个事务隔离级别类,下面我们看看基于 STM 框架如何实现 etcd 事务。

3、使用 STM 实现转账

那如何简化 etcd 事务实现的过程呢,etcd clientv3 提供了 STM,即软件事务内存,帮我们自动处理这些繁琐的过程。使用 STM 的转账业务代码如下:

package main

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"strconv"
	"time"
)

func main() 
	config := clientv3.Config
		Endpoints:   []string"192.168.91.66:12379",
		DialTimeout: 5 * time.Second,
	
	// 建立连接
	client, err := clientv3.New(config)
	if err != nil 
		fmt.Println(err)
		return
	

	sender, receiver := "/sender_amount", "/receiver_amount"
	_, err = client.Put(context.Background(), sender, "1000")
	if err != nil 
		fmt.Printf("etcd put /sender_amount failed, err:%v\\n", err)
		return
	
	_, err = client.Put(context.Background(), receiver, "500")
	if err != nil 
		fmt.Printf("etcd put /receiver_amount failed, err:%v\\n", err)
		return
	

	err = txnStmTransfer(client, sender, receiver, 200)
	if err != nil 
		fmt.Printf("etcd txnTransfer failed, err:%v\\n", err)
		return
	



func txnStmTransfer(cli *clientv3.Client, from, to string, amount uint64) error 
	// NewSTM 创建了一个原子事务的上下文,并把我们的业务代码作为一个函数传进去
	_, err := concurrency.NewSTM(cli, func(stm concurrency.STM) error 
		// stm.Get 封装好了事务的读操作
		senderNum := toUint64(stm.Get(from))
		receiverNum := toUint64(stm.Get(to))
		if senderNum < amount 
			return fmt.Errorf("余额不足")
		
		// stm.Put封装好了事务的写操作
		stm.Put(to, fromUint64(receiverNum+amount))
		stm.Put(from, fromUint64(senderNum-amount))
		return nil
	)
	return err


func toUint64(value string) uint64 
	ret, err := strconv.Atoi(value)
	if err != nil 
		fmt.Printf("value to toUint64 failed, err: %v\\n", err)
		return 0
	
	return uint64(ret)


func fromUint64(number uint64) string 
	return strconv.Itoa(int(number))


上述基于 STM 实现的转账业务流程,我们只要关注转账逻辑的实现即可,事务相关的其他操作由 STM 完成。

STM 的使用特别简单,只需把业务相关的代码封装成可重入的函数传给 stm,然后 STM 会处理好其余所有的细节。STM 对象在内部构造 txn 事务,把我们编写的业务函数翻译成 If-Then,自动提交事务,处理失败重试等工作,直到事务执行成功,或者出现异常,重试亦不能解决。

接下来我们一起初略的看看 NewSTM 的内部做了些什么

// 位于 clientv3/concurrency/stm.go:89
func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) 
   opts := &stmOptionsctx: c.Ctx()
   for _, f := range so 
      f(opts)
   
   if len(opts.prefetch) != 0 
      f := apply
      apply = func(s STM) error 
         s.Get(opts.prefetch...)
         return f(s)
      
   
   return runSTM(mkSTM(c, opts), apply)

根据源码可以知道,NewSTM 首先创建一个 stm,然后执行 stm,代码如下所示:

func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) 
	outc := make(chan stmResponse, 1)
	go func() 
		defer func() 
			if r := recover(); r != nil 
				e, ok := r.(stmError)
				if !ok 
					// client apply panicked
					panic(r)
				
				outc <- stmResponsenil, e.err
			
		()
		var out stmResponse
		for 
			s.reset()
			if out.err = apply(s); out.err != nil 
				break
			
			if out.resp = s.commit(); out.resp != nil 
				break
			
		
		outc <- out
	()
	r := <-outc
	return r.resp, r.err

runstm 主要是循环执行以下三个步骤:

  • 重置 stm,清空 STM 的读写缓存
  • 执行事务操作,apply 函数
  • 提交事务

etcd client 最终执行提交事务的操作:

txnresp, err := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...).Commit()

4、STM 实现细节

下面我们来看 STM 的实现原理。通过上面转账的例子,我们可以看到 STM 的使用特别简单,只需把业务相关的代码封装成可重入的函数传给 stm,而 STM 可自行处理事务相关的细节

// 位于 clientv3/concurrency/stm.go:25
type STM interface 
	// Get 返回键的值,并将该键插入 txn 的 read set 中。如果 Get 失败,它将以错误中止事务,没有返回
	Get(key ...string) string
	// Put 在 write set 中增加键值对
	Put(key, val string, opts ...v3.OpOption)
	// Rev 返回 read set 中某个键指定的版本号
	Rev(key string) int64
	// Del 删除某个键
	Del(key string)
	// commit 尝试提交事务到 etcd server
	commit() *v3.TxnResponse
	reset()

STM 是软件事务存储的接口。其中定义了 Get、Put、Rev、Del、commit、reset 接口方法。STM 的接口有两个实现类:stm 和 stmSerializable。具体选择哪一个,**由我们指定的隔离级别决定。

STM 对象在内部构造 txn 事务,业务函数转换成If-Then,自动提交事务以及处理失败重试等工作,直到事务执行成功。核心的NewSTM函数的实现如下所示:

// NewSTM initiates a new STM instance, using serializable snapshot isolation by default.
func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) 
	opts := &stmOptionsctx: c.Ctx()
	for _, f := range so 
		f(opts)
	
	if len(opts.prefetch) != 0 
		f := apply
		apply = func(s STM) error 
			s.Get(opts.prefetch...)
			return f(s)
		
	
	return runSTM(mkSTM(c, opts), apply)

根据源码可以知道,NewSTM首先判断该事务是否存在预取的键值对,如果存在,会无条件地直接 apply 函数;否则会创建一个 stm,并运行 stm 事务。runSTM 代码如下所示:

// 位于 clientv3/concurrency/stm.go:140
func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) 
	outc := make(chan stmResponse, 1)
	go func() 
		defer func() 
			if r := recover(); r != nil 
				e, ok := r.(stmError)
				if !ok 
					// 执行异常
					panic(r)
				
				outc <- stmResponsenil, e.err
			
		()
		var out stmResponse
		for 
            // 重置 stm
			s.reset()
            // 执行事务操作,apply 函数
			if out.err = apply(s); out.err != nil 
				break
			
            // 提交事务
			if out.resp = s.commit(); out.resp != nil 
				break
			
		
		outc <- out
	()
	r := <-outc
	return r.resp, r.err

runSTM 函数首先重置了 stm,清空 STM 的读写缓存;接着执行事务操作,apply 应用函数;最后将事务提交。提交事务的实现如下:

// 位于 clientv3/concurrency/stm.go:265
func (s *stm) commit() *v3.TxnResponse 
   txnresp, err := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...).Commit()
   if err != nil 
      panic(stmErrorerr)
   
   if txnresp.Succeeded 
      return txnresp
   
   return nil

上述 commit 的实现包含了我们前面所介绍的 etcd 事务语法。If 中封装了冲突检测条件,提交事务则是 etcd 的 Txn 将 wset 中的数据写入并提交的过程。

下面我们来看看 etcd 隔离级别以及在 STM 封装基础上如何实现事务。

4、etcd 事务隔离级别

数据库有如下几种事务隔离级别 (Transaction Isolation Levels):

  • 未提交读(Read Uncommitted):能够读取到其他事务中还未提交的数据,这可能会导致脏读的问题。
  • 读已提交(Read Committed):只能读取到已经提交的数据,即别的事务一提交,当前事务就能读取到被修改的数据,这可能导致不可重复读的问题。
  • 可重复读(Repeated Read):一个事务中,同一个读操作在事务的任意时刻都能得到同样的结果,其他事务的提交操作对本事务不会产生影响。
  • 串行化(Serializable):串行化的执行可能冲突的事务,即一个事务会阻塞其他事务。它通过牺牲并发能力来换取数据的安全,属于最高的隔离级别。

而 etcd clientv3 实现了四种事务模型,位于 clientv3/concurrency/stm.go 中,分别为 SerializableSnapshot、Serializable、RepeatableReads 和 ReadCommitted。

// 位于 clientv3/concurrency/stm.go:45
const (
	// SerializableSnapshot provides serializable isolation and also checks
	// for write conflicts.
	SerializableSnapshot Isolation = iota
	// Serializable reads within the same transaction attempt return data
	// from the at the revision of the first read.
	Serializable
	// RepeatableReads reads within the same transaction attempt always
	// return the same data.
	RepeatableReads
	// ReadCommitted reads keys from any committed revision.
	ReadCommitted
)


// WithIsolation specifies the transaction isolation level.
func WithIsolation(lvl Isolation) stmOption 
	return func(so *stmOptions)  so.iso = lvl 

STM 的事务级别通过 stmOption 指定,默认就是 SerializableSnapshot。下面分别介绍这几种隔离级别。

构造 STM 的实现如下所示:

// 位于 clientv3/concurrency/stm.go:105
func mkSTM(c *v3.Client, opts *stmOptions) STM 
   switch opts.iso 
   // 串行化快照
   case SerializableSnapshot:
      s := &stmSerializable
         stm:      stmclient: c, ctx: opts.ctx,
         prefetch: make(map[string]*v3.GetResponse),
      
      s.conflicts = func() []v3.Cmp 
         return append(s.rset.cmps(), s.wset.cmps(s.rset.first()+1)...)
      
      return s
   // 串行化
   case Serializable:
      s := &stmSerializable
         stm:      stmclient: c, ctx: opts.ctx,
         prefetch: make(map[string]*v3.GetResponse),
      
      s.conflicts = func() []v3.Cmp  return s.rset.cmps() 
      return s
   // 可重复读   
   case RepeatableReads:
      s := &stmclient: c, ctx: opts.ctx, getOpts: []v3.OpOptionv3.WithSerializable()
      s.conflicts = func() []v3.Cmp  return s.rset.cmps() 
      return s
   // 已提交读
   case ReadCommitted:
      s := &stmclient: c, ctx: opts.ctx, getOpts: []v3.OpOptionv3.WithSerializable()
      s.conflicts = func() []v3.Cmp  return nil 
      return s
   default:
      panic("unsupported stm")
   

该函数是根据隔离级别定义的。每一类隔离级别对应不同的冲突检测条件,存在读操作差异,因此我们需要搞清楚每一类隔离级别在这两方面的实现。

从构建 SMT 的实现代码可以知道,etcd 隔离级别与一般的数据库隔离级别的差异是没有未提交读的隔离级别,这是由于 etcd 的 kv 操作(包括 txn 事务内的多个 keys 操作)都是原子操作,所以你不可能读到未提交的修改。下面我们将从低到高分别介绍 etcd 事务隔离级别。

4.1 ReadCommitted

读提交是指,一个事务提交之后,它做的变更才会被其他事务看到。只允许获取已经提交的数据。比如事务 A 和事务 B 同时进行,事务 A 进行 +1 操作,此时,事务 B 无法看到这个数据项在事务A操作过程中的所有中间值,只能看到最终的 10。

由于 etcd 的 kv 操作(包括 txn 事务内的多个 keys 操作)都是原子操作,所以你不可能读到未提交的修改,ReadCommitted 是 etcd 中的最低事务级别。

由构造 STM 的源码可知,ReadCommitted 调用的是 stm 的实现。对于不一样的隔离级别,我们主要关注的就是读操作和提交时的冲突检测条件。而对于写操作,会先写进本地缓存,直到事务提交时才真正写到 etcd 里。

  • 读操作
func (s *stm) Get(keys ...string) string 
	if wv := s.wset.get(keys...); wv != nil 
		return wv.val
	
	return respToValue(s.fetch(keys...))

从 etcd 读取 keys,就像普通的 kv 操作一样。第一次 Get 后,在事务中缓存,后续不再从 etcd 读取。

  • 冲突检测条件
s.conflicts = func() []v3.Cmp  return nil 

没有任何冲突检测。

ReadCommitted 只需要确保自己读到的是别人已经提交的数据,由于 etcd 的 kv 操作都是原子操作,所以不可能读到未提交的修改。

4.2 RepeatableReads

可重复读是指,一个事务执行过程中看到的数据,总是跟这个事务在启动时看到的数据是一致的。多次读取同一个数据时,其值都和事务开始时刻是一致的,因此该事务级别解决了不可重复读取和脏读取的问题,但是有可能出现幻影数据。所谓幻影数据,在一个事务内多次查询某个符合查询条件的「记录数量」,如果出现前后两次查询到的记录数量不一样的情况,就意味着发生了「幻读」现象。

  • 读操作

与 ReadCommitted 类似,用 readSet 缓存已经读过的数据,这样下次再读取相同数据的时候才能得到同样的结果,确保了可重复读。

  • 冲突检测条件
s.conflicts = func() []v3.Cmp  return s.rset.cmps() 

在事务提交时,确保事务中 Get 的 keys 没有被改动过。因此使用 readSet 数据的 ModRevision 做冲突检测,确保本事务读到的数据都是最新的。

MySQL 事务“可重复读”是通过在事务第一次 select 时建立 readview,来确保事务中读到的是到这一刻为止的最新数据,忽略后面发生的更新。而这里每个 key 的 Get 是独立的(也可以说,每个 key 都是获取的当前值,没有 readview 的概念),在事务提交时,如果这些 keys 没有变动过,那么事务就可以提交。

4.3 Serializable

串行化,顾名思义是对同一行记录,“写”会加“写锁”,“读”会加“读锁”。当出现读写锁冲突的时候,后访问的事务必须等前一个事务执行完成,才能继续执行。是最严格的事务隔离级别,它要求所有事务被串行执行。

  • 读操作
func (s *stmSerializable) Get(keys ...string) string 
   if wv := s.wset.get(keys...); wv != nil 
      return wv.val
   
   // 判断是否第一次读
   firstRead := len(s.rset) == 0
   for _, key := range keys 
      if resp, ok := s.prefetch[key]; ok 
         delete(s.prefetch, key)
         s.rset[key] = resp
      
   
   resp := s.stm.fetch(keys...)
   if firstRead 
      // 记录下第一次读的版本作为基准
      s.getOpts = []v3.OpOption
         v3.WithRev(resp.Header.Revision),
         v3.WithSerializable(),
      
   
   return respToValue(resp)

事务中第一次读操作完成时,保存当前版本号 Revision;后续其他读请求会带上这个版本号,获取指定 Revision 版本的数据。这确保了该事务所有的读操作读到的都是同一时刻的内容。

  • 冲突检测条件
s.conflicts = func() []v3.Cmp  return s.rset.cmps() 

在事务提交时,需要检查事务中 Get 的 keys 是否被改动过,而 etcd 串行化的约束还不够,它缺少了验证事务要修改的 keys 这一步。下面的 SerializableSnapshot 事务增加了这个约束。

可见,这个约束比数据库串行化的约束要低,它没有验证事务要修改的 keys 是否被改动过,下面的 SerializableSnapshot 事务增加了这个约束。

4.4 SerializableSnapshot

SerializableSnapshot串行化快照隔离,提供可序列化的隔离,并检查写冲突。etcd 默认采用这种隔离级别,串行化快照隔离是最严格的隔离级别,可以避免幻影读。其读操作与冲突检测的过程如下。

  • 读操作

与 Serializable 串行化读类似。事务中的第一个 Get 操作发生时,保存服务器返回的当前 Revision;后续对其他 keys 的 Get 操作,指定获取 Revision 版本的 value。

  • 冲突检测条件
s.conflicts = func() []v3.Cmp 
    return append(s.rset.cmps(), s.wset.cmps(s.rset.first()+1)...)

在事务提交时,检查事务中 Get 的 keys 以及要修改的 keys 是否被改动过。

SerializableSnapshot 不仅确保了读取过的数据是最新的,同时也确保了要写入的数据同样没有被其他事务更改过,是隔离的最高级别。

通过上面的分析,我们清楚了如何使用 etcd 的 txn 事务,构建符合 ACID 语义的事务框架。如果这些语义不能满足你的业务需求,通过扩展 etcd 的官方 client sdk,写一个新 STM 事务类型即可。

需要强调的是,数据库事务是“锁/阻塞”模式,而 etcd 的 STM 事务是 “CAS/重试” 模式,这是有差别的。简单的说,数据库事务不会自己重试,而 STM 事务在发生冲突是会多次重试,必须要保证业务代码是可重试的,且必须有明确的失败条件(例如判断账户余额是否够转账)。

参考资料:

bilibili视频讲解etcd事务

ETCD 十 分布式事务

彻底搞懂 etcd 系列文章(八):etcd 事务 API

etcd系列之事务:etcd 中如何实现事务(上)?

事务:etcd 中如何实现事务(下)?

Spring相关面试题:谈一谈你对事务的理解?

事务详解

2. 事务管理

2.1 回顾事务

  • 什么是事务?
    • 在数据库开发中,一组业务逻辑操作,要么全部成功,要么全部失败。
  • 事务有什么特定?ACID
    • 原子性:整体,原子不可分割的。整个操作被看成一个整体,要么成功,要么失败。
    • 一致性:数据,事务操作的前后数据一致。
    • 隔离性:并发,两个事务之间并发访问情况。
    • 持久性:结果,事务一旦提交,不能回滚。
  • 隔离有什么问题?
    • 脏读:一个事务读到了另一个事务没有提交的数据。
    • 不可重复读:一个事务读到了另一个事务已有提交的数据(update)。
    • 幻读:一个事务读到了另一个事务已有提交的数据(insert)。
  • 隔离级别有那些?
    • 读未提交:存在3个问题(脏读、不可重复读、幻读)
    • 读已提交:解决1个问题(脏读),存在2个问题(不可重复读、幻读)
    • 可重复读:解决2个问题(脏读、不可重复读)、存在1个问题(幻读)
    • 串行化:解决3个问题(脏读、不可重复读、幻读)–单事务

2.2 事务详解

  • 研究Spring事务,需要学习事务管理平台管理器:PlatformTransactionManager
    • 在平台管理器中,通过事务的定义获得事务,从而进行事务提交或回滚操作。
  • 事务定义 TransactionDefinition 的详解:

  • 传播行为:一个事务调用另一个事务,事务共享问题。
  1. PROPAGATION_REQUIRED,required:支持当前事务,如果没有事务,创建一个新的。

​ A 有事务,B使用A的事务。(支持当前事务)

​ A没有事务,B创建新的。()

  1. PROPAGATION_SUPPORTS,supports:支持当前事务,如果没有事务,以非事务执行。

​ A 有事务,B使用A的事务。(支持当前事务)

​ A没有事务,B以非事务执行。

  1. PROPAGATION_MANDATORY,mandatory:支持当前事务,如果没有事务,抛异常

​ A 有事务,B使用A的事务。(支持当前事务)

​ A没有事务,B抛异常。

  1. PROPAGATION_REQUIRES_NEW,requires_new:创建一个新事物,如果当前有事务,将挂起。

​ A 有事务,B创建新事务,同时挂起A事务。

​ A 没有事务,B创建新事务。

  1. PROPAGATION_NOT_SUPPORTED, not_supported:不支持当前事务,以非事务执行,如果有挂起

​ A 有事务,B以非事务执行,同时挂起A事务。

​ A 没有事务,B以非事务执行。

  1. PROPAGATION_NEVER, never:不支持当前事务,如果有抛异常。

​ A 有事务,B抛异常

​ A 没有事务,B以非事务执行。

  1. PROPAGATION_NESTED, nested :嵌套事务,底层使用savepoint进行嵌套事务操作。

    ​ 保存点允许回顾部分事务。

相关源码:

/*
 * Copyright 2002-2015 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.transaction;

import java.sql.Connection;

/**
 * Interface that defines Spring-compliant transaction properties.
 * Based on the propagation behavior definitions analogous to EJB CMT attributes.
 *
 * <p>Note that isolation level and timeout settings will not get applied unless
 * an actual new transaction gets started. As only {@link #PROPAGATION_REQUIRED},
 * {@link #PROPAGATION_REQUIRES_NEW} and {@link #PROPAGATION_NESTED} can cause
 * that, it usually doesn't make sense to specify those settings in other cases.
 * Furthermore, be aware that not all transaction managers will support those
 * advanced features and thus might throw corresponding exceptions when given
 * non-default values.
 *
 * <p>The {@link #isReadOnly() read-only flag} applies to any transaction context,
 * whether backed by an actual resource transaction or operating non-transactionally
 * at the resource level. In the latter case, the flag will only apply to managed
 * resources within the application, such as a Hibernate {@code Session}.
 *
 * @author Juergen Hoeller
 * @since 08.05.2003
 * @see PlatformTransactionManager#getTransaction(TransactionDefinition)
 * @see org.springframework.transaction.support.DefaultTransactionDefinition
 * @see org.springframework.transaction.interceptor.TransactionAttribute
 */
public interface TransactionDefinition {

	/**
	 * Support a current transaction; create a new one if none exists.
	 * Analogous to the EJB transaction attribute of the same name.
	 * <p>This is typically the default setting of a transaction definition,
	 * and typically defines a transaction synchronization scope.
	 */
	int PROPAGATION_REQUIRED = 0;

	/**
	 * Support a current transaction; execute non-transactionally if none exists.
	 * Analogous to the EJB transaction attribute of the same name.
	 * <p><b>NOTE:</b> For transaction managers with transaction synchronization,
	 * {@code PROPAGATION_SUPPORTS} is slightly different from no transaction
	 * at all, as it defines a transaction scope that synchronization might apply to.
	 * As a consequence, the same resources (a JDBC {@code Connection}, a
	 * Hibernate {@code Session}, etc) will be shared for the entire specified
	 * scope. Note that the exact behavior depends on the actual synchronization
	 * configuration of the transaction manager!
	 * <p>In general, use {@code PROPAGATION_SUPPORTS} with care! In particular, do
	 * not rely on {@code PROPAGATION_REQUIRED} or {@code PROPAGATION_REQUIRES_NEW}
	 * <i>within</i> a {@code PROPAGATION_SUPPORTS} scope (which may lead to
	 * synchronization conflicts at runtime). If such nesting is unavoidable, make sure
	 * to configure your transaction manager appropriately (typically switching to
	 * "synchronization on actual transaction").
	 * @see org.springframework.transaction.support.AbstractPlatformTransactionManager#setTransactionSynchronization
	 * @see org.springframework.transaction.support.AbstractPlatformTransactionManager#SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
	 */
	int PROPAGATION_SUPPORTS = 1;

	/**
	 * Support a current transaction; throw an exception if no current transaction
	 * exists. Analogous to the EJB transaction attribute of the same name.
	 * <p>Note that transaction synchronization within a {@code PROPAGATION_MANDATORY}
	 * scope will always be driven by the surrounding transaction.
	 */
	int PROPAGATION_MANDATORY = 2;

	/**
	 * Create a new transaction, suspending the current transaction if one exists.
	 * Analogous to the EJB transaction attribute of the same name.
	 * <p><b>NOTE:</b> Actual transaction suspension will not work out-of-the-box
	 * on all transaction managers. This in particular applies to
	 * {@link org.springframework.transaction.jta.JtaTransactionManager},
	 * which requires the {@code javax.transaction.TransactionManager} to be
	 * made available it to it (which is server-specific in standard Java EE).
	 * <p>A {@code PROPAGATION_REQUIRES_NEW} scope always defines its own
	 * transaction synchronizations. Existing synchronizations will be suspended
	 * and resumed appropriately.
	 * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
	 */
	int PROPAGATION_REQUIRES_NEW = 3;

	/**
	 * Do not support a current transaction; rather always execute non-transactionally.
	 * Analogous to the EJB transaction attribute of the same name.
	 * <p><b>NOTE:</b> Actual transaction suspension will not work out-of-the-box
	 * on all transaction managers. This in particular applies to
	 * {@link org.springframework.transaction.jta.JtaTransactionManager},
	 * which requires the {@code javax.transaction.TransactionManager} to be
	 * made available it to it (which is server-specific in standard Java EE).
	 * <p>Note that transaction synchronization is <i>not</i> available within a
	 * {@code PROPAGATION_NOT_SUPPORTED} scope. Existing synchronizations
	 * will be suspended and resumed appropriately.
	 * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
	 */
	int PROPAGATION_NOT_SUPPORTED = 4;

	/**
	 * Do not support a current transaction; throw an exception if a current transaction
	 * exists. Analogous to the EJB transaction attribute of the same name.
	 * <p>Note that transaction synchronization is <i>not</i> available within a
	 * {@code PROPAGATION_NEVER} scope.
	 */
	int PROPAGATION_NEVER = 5;

	/**
	 * Execute within a nested transaction if a current transaction exists,
	 * behave like {@link #PROPAGATION_REQUIRED} else. There is no analogous
	 * feature in EJB.
	 * <p><b>NOTE:</b> Actual creation of a nested transaction will only work on
	 * specific transaction managers. Out of the box, this only applies to the JDBC
	 * {@link org.springframework.jdbc.datasource.DataSourceTransactionManager}
	 * when working on a JDBC 3.0 driver. Some JTA providers might support
	 * nested transactions as well.
	 * @see org.springframework.jdbc.datasource.DataSourceTransactionManager
	 */
	int PROPAGATION_NESTED = 6;


	/**
	 * Use the default isolation level of the underlying datastore.
	 * All other levels correspond to the JDBC isolation levels.
	 * @see java.sql.Connection
	 */
	int ISOLATION_DEFAULT = -1;

	/**
	 * Indicates that dirty reads, non-repeatable reads and phantom reads
	 * can occur.
	 * <p>This level allows a row changed by one transaction to be read by another
	 * transaction before any changes in that row have been committed (a "dirty read").
	 * If any of the changes are rolled back, the second transaction will have
	 * retrieved an invalid row.
	 * @see java.sql.Connection#TRANSACTION_READ_UNCOMMITTED
	 */
	int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;

	/**
	 * Indicates that dirty reads are prevented; non-repeatable reads and
	 * phantom reads can occur.
	 * <p>This level only prohibits a transaction from reading a row
	 * with uncommitted changes in it.
	 * @see java.sql.Connection#TRANSACTION_READ_COMMITTED
	 */
	int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;

	/**
	 * Indicates that dirty reads and non-repeatable reads are prevented;
	 * phantom reads can occur.
	 * <p>This level prohibits a transaction from reading a row with uncommitted changes
	 * in it, and it also prohibits the situation where one transaction reads a row,
	 * a second transaction alters the row, and the first transaction re-reads the row,
	 * getting different values the second time (a "non-repeatable read").
	 * @see java.sql.Connection#TRANSACTION_REPEATABLE_READ
	 */
	int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;

	/**
	 * Indicates that dirty reads, non-repeatable reads and phantom reads
	 * are prevented.
	 * <p>This level includes the prohibitions in {@link #ISOLATION_REPEATABLE_READ}
	 * and further prohibits the situation where one transaction reads all rows that
	 * satisfy a {@code WHERE} condition, a second transaction inserts a row
	 * that satisfies that {@code WHERE} condition, and the first transaction
	 * re-reads for the same condition, retrieving the additional "phantom" row
	 * in the second read.
	 * @see java.sql.Connection#TRANSACTION_SERIALIZABLE
	 */
	int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;


	/**
	 * Use the default timeout of the underlying transaction system,
	 * or none if timeouts are not supported.
	 */
	int TIMEOUT_DEFAULT = -1;


	/**
	 * Return the propagation behavior.
	 * <p>Must return one of the {@code PROPAGATION_XXX} constants
	 * defined on {@link TransactionDefinition this interface}.
	 * @return the propagation behavior
	 * @see #PROPAGATION_REQUIRED
	 * @see org.springframework.transaction.support.TransactionSynchronizationManager#isActualTransactionActive()
	 */
	int getPropagationBehavior();

	/**
	 * Return the isolation level.
	 * <p>Must return one of the {@code ISOLATION_XXX} constants
	 * defined on {@link TransactionDefinition this interface}.
	 * <p>Only makes sense in combination with {@link #PROPAGATION_REQUIRED}
	 * or {@link #PROPAGATION_REQUIRES_NEW}.
	 * <p>Note that a transaction manager that does not support custom isolation levels
	 * will throw an exception when given any other level than {@link #ISOLATION_DEFAULT}.
	 * @return the isolation level
	 */
	int getIsolationLevel();

	/**
	 * Return the transaction timeout.
	 * <p>Must return a number of seconds, or {@link #TIMEOUT_DEFAULT}.
	 * <p>Only makes sense in combination with {@link #PROPAGATION_REQUIRED}
	 * or {@link #PROPAGATION_REQUIRES_NEW}.
	 * <p>Note that a transaction manager that does not support timeouts will throw
	 * an exception when given any other timeout than {@link #TIMEOUT_DEFAULT}.
	 * @return the transaction timeout
	 */
	int getTimeout();

	/**
	 * Return whether to optimize as a read-only transaction.
	 * <p>The read-only flag applies to any transaction context, whether
	 * backed by an actual resource transaction
	 * ({@link #PROPAGATION_REQUIRED}/{@link #PROPAGATION_REQUIRES_NEW}) or
	 * operating non-transactionally at the resource level
	 * ({@link #PROPAGATION_SUPPORTS}). In the latter case, the flag will
	 * only apply to managed resources within the application, such as a
	 * Hibernate {@code Session}.
	 <<	 * <p>This just serves as a hint for the actual transaction subsystem;
	 * it will <i>not necessarily</i> cause failure of write access attempts.
	 * A transaction manager which cannot interpret the read-only hint will
	 * <i>not</i> throw an exception when asked for a read-only transaction.
	 * @return {@code true} if the transaction is to be optimized as read-only
	 * @see org.springframework.transaction.support.TransactionSynchronization#beforeCommit(boolean)
	 * @see org.springframework.transaction.support.TransactionSynchronizationManager#isCurrentTransactionReadOnly()
	 */
	boolean isReadOnly();

	/**
	 * Return the name of this transaction. Can be {@code null}.
	 * <p>This will be used as the transaction name to be shown in a
	 * transaction monitor, if applicable (for example, WebLogic's).
	 * <p>In case of Spring's declarative transactions, the exposed name will be
	 * the {@code fully-qualified class name + "." + method name} (by default).
	 * @return the name of this transaction
	 * @see org.springframework.transaction.interceptor.TransactionAspectSupport
	 * @see org.springframework.transaction.support.TransactionSynchronizationManager#getCurrentTransactionName()
	 */
	String getName();

}

以上是关于谈一谈如何使用etcd中的事务以及自己的理解的主要内容,如果未能解决你的问题,请参考以下文章

产品以自己的角度谈一谈对产品经理的理解

Spring相关面试题:谈一谈你对事务的理解?

Spring相关面试题:谈一谈你对事务的理解?

谈一谈我对react中组件的理解

浅显易懂的谈一谈python中的装饰器!!

大厂面试06期谈一谈你对Redis持久化的理解?