mysql在django中开启事务,实现悲观锁和乐观锁

Posted 骑台风走

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mysql在django中开启事务,实现悲观锁和乐观锁相关的知识,希望对你有一定的参考价值。

事务出现的介绍

高并发场景下mysql存在的数据问题

介绍

--并发控制:当程序中可能出现并发的情况时,就需要保证在并发情况下数据的准确性,以此确保当前用户和其他用户一起操作时,所得到的结果和他单独操作时的结果是一样的。这种手段就叫做并发控制。并发控制的目的是保证一个用户的工作不会对另一个用户的工作产生不合理的影响。
    --没有做好并发控制,就可能导致脏读、幻读和不可重复读等问题。
    --常说的多个
    --无论是悲观锁还是乐观锁,都是人们定义出来的概念,可以认为是一种思想。其实不仅仅是关系型数据库系统中有乐观锁和悲观锁的概念,像 hibernate、tair、memcache 等都有类似的概念。所以,不应该拿乐观锁、悲观锁和其他的数据库锁等进行对比。
    --乐观锁比较适用于读多写少的情况(多读场景),悲观锁比较适用于写多读少的情况(多写场景)。
 

1. 脏读(针对的是未提交读数据)

事务A修改了数据,但未提交,而事务B查询了事务A,修改过却没有提交的数据,这就是脏读,因为事务A可能会回滚。

2. 不可重复读(针对其他提交前后,读取数据本身的对比)

强调事务A对要操作的数据,被事务B修改了,但在事务A,不知请的情况下拿去做之前的用途

对于不可重复读,说简单点就是同一个事物内,查到的结果都不一致,就失去了MySQL的“一致性”,这是很严重的错误。

3.幻读(针对其他提交前后,读取数据条数的对比)

幻读是指在同一个事务中,存在前后两次查询同一个范围的数据,但是第二次查询却看到了第一次查询没看到的行,一般情况下只新增。

解决高并发下产生的问题,事务由此出现(事务的隔离级别)

为了解决上述问题,MySQL制定了四种不同的隔离级别:

读未提交(read uncommitted)

读提交(read committed)

可重复读(repeatable read)

串行化(serializable )本文将介绍这个隔离级别

隔离级别效果
读未提交(RU)

一个事务还没提交时,它做的变更就能被别的事务看到。(别的事务指同一时间进行的增删改查操作)

读提交(RC)一个事务提交(commit)之后,它做的变更才会被其他事务看到。
读提交(RC)一个事务执行过程中看到的数据,总是跟这个事务在启动时看到的数据是一致的。当然在可重复读隔离级别下,未提交变更对其他事务也是不可见的
串行(xíng)化(S)正如物理书上写的,串行是单线路,顾名思义在MySQL中同一时刻只允许单个事务执行,“写”会加“写锁”,“读”会加“读锁”。当出现读写锁冲突的时候,后访问的事务必须等前一个事务执行完成,才能继续执行。

四种事务隔离级别,解决情况

标题脏读不可重复读幻读
读未提交(RU)×××
读提交(RC)××
可重复读(RR)
串行(xíng)化(S)

事务的四大特性(ACID)

  • 原子性(Atomicity):整个事务中的所有操作,要么全部完成,要么全部不完成。事务在执行过程中发生错误,会被回滚到事务开始前的状态。
  • 一致性 (Consistency):事务开始之前和事务结束后,数据库的完整性约束没有被破坏。
  • 隔离性(Isolation):隔离性是指当多个用户并发访问数据库时,比如同时访问一张表,数据库每一个用户开启的事务,不能被其他事务所做的操作干扰,多个并发事务之间,应当相互隔离。
  • 持久性(Durability):事务执行成功后,该事务对数据库的更改是持久保存在数据库中的,不会被回滚。

注意:并不是所有的数据库或框架支持事务操作。比如在MySQL中只有使用了 Innodb 数据库引擎的数据库或表才支持事务。

事务的一些常用术语

  • 开启事务:Start Transaction
  • 事务结束:End Transaction
  • 提交事务:Commit Transaction
  • 回滚事务:Rollback Transaction

隐式事务,显式事务,自动提交事务 

显式事务:

所谓显示事务就是通过begin transaction 语句来显式启动事务,并由commit transaction语句进行提交
—begin transaction 之后的所有操作都在一个事务中一旦出现错误事务会进行回滚,将清除begin transaction之后所有操作,回到原点

显式事务:

隐式事务需要用T_语句才能打开,打开隐式事务的语句是 
SET IMPLICIT_TRANSACTIONS ON
–一旦隐式事务打开,数据库实例第一次执行alert table,insert,create,open,delete,revoke,drop,select,fetch,truncate table,grant,update语句时,会自动开启一个事务,开启的事务需要利用commit或rollback结束;当事务结束时,一旦运行以上类型的语句,会再次自动开启一个新的事务,
–这样就形成了一个事务链

自动提交事务:

自动提交事务是sqlserver默认模式,该类型不需要开发人员手工做任何操作,每个单独的T_SQL语句都在其完成后自动提交,如果出现错误则回滚。
开发人员无法对其严格控制不适合大规模导入,不适合业务关联数据录用,如果完成一项业务需要3句语句,当第二条出错时,第一条无法撤销,所以无法保证事务一致型

锁介绍

使用经验:

--悲观锁、乐观锁仅仅只是一种思想,与语言无关,实现这个思想的有共享锁与排他锁...

--如果重试代价比较大,就用悲观锁

--如果只是可能会发生冲突或者发生冲突时并发数其实不是非常高的话最好使用乐观锁,

--如果经常发生冲突且冲突时并发很高建议使用悲观锁,因为悲观锁不会导致自循环,减少线程空转

--乐观锁常用于多读 悲观锁常用于多写

MySQL都分为哪些锁

按锁粒度从大到小分类:表锁,页锁和行锁;以及特殊场景下使用的全局锁

如果按锁级别分类则有:共享(读)锁、排他(写)锁、意向共享(读)锁、意向排他(写)锁;

以及Innodb引擎为解决幻读等并发场景下事务存在的数据问题,引入的Record Lock(行记录锁)、Gap Lock(间隙锁)、Next-key Lock(Record Lock + Gap Lock结合)等;

还有就是我们面向编程的两种锁思想:悲观锁、乐观锁。

锁图解

 上锁介绍

--通过上锁的方式解决高并发时的数据冲突:
    --悲观锁:借助mysql锁机制实现,可能产生死锁
        --悲观锁:当要对数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制【Pessimistic Concurrency Control,缩写“PCC”,又名“悲观锁”】
        --之所以叫做悲观锁,是因为这是一种对数据的修改持有悲观态度的并发控制方式。总是假设最坏的情况,每次读取数据的时候都默认其他线程会更改数据,因此需要进行加锁操作,当其他线程想要访问数据时,都需要阻塞挂起。悲观锁的实现:
            --传统的关系型数据库使用这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁
            --Java 里面的同步 synchronized 关键字的实现
        --悲观锁主要分为共享锁和排他锁:
            --共享锁【shared locks】又称为读锁,简称S锁。顾名思义,共享锁就是多个事务对于同一数据可以共享一把锁,都能访问到数据,但是只能读不能修改。
            --排他锁【exclusive locks】又称为写锁,简称X锁。顾名思义,排他锁就是不能与其他锁并存,如果一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据行读取和修改。
    
        --[锁的级别]:需要注意一些锁的级别,MySQL InnoDB 默认行级锁。行级锁都是基于索引的,如果一条 SQL 语句用不到索引是不会使用行级锁的,会使用表级锁把整张表锁住,这点需要注意
        
        --表级锁
            --表锁(MySQL layer):手动加
                --read lock:加读锁后,可以加读锁不可以加写锁
                --write lock:加写锁后,不可以加读锁也不可以加写锁
            --元数据锁(MySQL layer):自动加
                --CURD 加读锁
                --DDL 加写锁
            --意向锁(InnoDB):内部使用
                --共享读锁(IS)
                --排他写锁(IX)
        --行级锁(InnoDB)
            --共享读锁(S):手动加 -- select ... lock in share mode
            --排他写锁(X):自动加 -- DML (insert, update, delete)
                               -- select ... for update
        --悲观锁适用场景:
            ▧ 无脏读  上锁数据保证一致, 因此无脏读, 对脏读不允许的环境悲观锁可以胜任
            ▧ 无并行  悲观锁对事务成功性可以保证, 但是会对数据加锁导致无法实现数据的并行处理
            ▧ 事务成功率高  上锁保证一次成功, 因此在对数据处理的成功率要求较高的时候更适合悲观锁
            ▧ 开销大  悲观锁的上锁解锁是有开销的, 如果超大的并发量这个开销就不容小视, 因此不适合在高并发环境中使用悲观锁 
            ▧ 一次性完成  如果乐观锁多次尝试的代价比较大,也建议使用悲观锁, 悲观锁保证一次成功
 

乐观锁

    --乐观锁:通过程序实现,不会产生死锁
        --乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。
        --乐观锁一定要把mysql事务级别调整为 Read Committed[读取提交内容]:
            --查看事务级别:select @@global.tx_isolation;
            --RepeatableRead[可重读, Mysql默认的隔离级别]
            --到mysql的配置文件中去修改transcation-isolation = READ-COMMITTED
        --乐观锁的实现
            --CAS 实现:Java 中java.util.concurrent.atomic包下面的原子变量使用了乐观锁的一种 CAS 实现方式
            --版本号控制:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会+1。当线程A要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值与当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功
        --乐观锁适用场景:
            ▧ 脏读  乐观锁不涉及到上锁的处理, 因此在数据并行需求的时候是更适合乐观锁,当然会产生脏读, 不过用回滚取消掉了
            ▧ 高并发  相比起悲观锁的开销, 乐观锁也是比悲观锁更适合于高并发场景
            ▧ 事务成功率低  乐观锁不能保证每次事务的成功, 是使用回滚方式来保证数据一致性, 因此会导致事务成功率很低
            ▧ 读多写少  乐观锁适用于读多写少的应用场景,这样可以提高并发粒度
            ▧ 开销小  可能会导致很多次的回滚都不能拿到正确的处理回应, 因此如果对成功性要求低,而且每次开销小比较适合乐观锁
 

cas介绍

--CAS介绍:乐观锁主要是 冲突检测 和 数据更新两个步骤,CAS就是Compare And Swap
    --CAS 即比较并交换。是解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS 操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值(V)与预期原值(A)相匹配,那么处理器会自动将该位置值更新为新值(B)。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置(V)应该包含值(A)。如果包含该值,则将新值(B)放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可”。Java 中,sun.misc.Unsafe 类提供了硬件级别的原子操作来实现这个 CAS。java.util.concurrent包下大量的类都使用了这个 Unsafe.java 类的 CAS 操作。
    --CAS存在问题:A B 两个线程同时取数据3,A线程通过操作将数据由3->2->3,B检查发现数据还是3因此进行了更新,这就可能会导致错误。因此对于需要并发控制的数据可以通过增加version号,每次操作+1来表示数据的新鲜度,当然也可以使用时间戳字段这种自带自增属性的方式。
    --锁粒度控制:是指锁住资源的范围大小也可以等同于限制的严格程度,锁的范围越大 == 限制越严格。在高并发情况下,使用乐观锁就意味着一个线程成功就意味着其他线程失败。因此控制锁粒度非常重要,例如更新库存:
        --update item set quantity = quantity - 1 where id = 1 and quantity -1 > 0  锁粒度较小
        --update item set quantity = quantity - 1 where id = 1 and quantity = origin_quantity  锁粒度较大  
        
    --在高并发情况下,会出现大量线程空旋转,最好优化一下,这里按下不表
 

什么时候该用悲观锁,什么时候该用乐观锁呢

  • 并发量:如果并发量不大,可以使用悲观锁解决并发问题;但如果系统的并发非常大的话,悲观锁定会带来非常大的性能问题, 建议乐观锁。
  • 响应速度:如果需要非常高的响应速度,建议采用乐观锁方案,成功就执行,不成功就失败,不需要等待其他并发去释放锁。乐观锁并未真正加锁,效率高。
  • 冲突频率:如果冲突频率非常高,建议采用悲观锁,保证成功率。冲突频率大,选择乐观锁会需要多次重试才能成功,代价比较大。
  • 重试代价:如果重试代价大,建议采用悲观锁。悲观锁依赖数据库锁,效率低。更新失败的概率比较低。

在django中开启事务

Django默认事务行为

Django是支持事务操作的,它的默认事务行为是自动提交,

具体表现形式为:每次数据库操作(比如调用save()方法)会立即被提交到数据库中。

但是如果你希望把连续的SQL操作包裹在一个事务里,就需要手动开启事务。

全局开启事务

它的工作原理是这样的:每当有请求过来时,Django会在调用视图方法前开启一个事务。如果完成了请求处理并正确返回了结果,Django就会提交该事务。否则,Django会回滚该事务。

虽然全局开启事务很简单,但Django并不推荐开启全局事务。因为一旦将事务跟 HTTP 请求绑定到一起时,每一个请求都会开启事务,当访问量增长到一定的时候会造成很大的性能损耗。在实际开发过程中,很多GET请求根本不涉及到事务操作,一个更好的方式是局部开启事务按需使用。

在Web应用中,常用的事务处理方式是将每次请求都包裹在一个事务中。全局开启事务只需要将数据库的配置项ATOMIC_REQUESTS设置为True,如下所示:

DATABASES = 
     'default': 
         'ENGINE': 'django.db.backends.mysql',
         'NAME': 'db1',
         'HOST': 'dbhost',
         'PORT': '3306',
         'USER': 'dbuser',
         'PASSWORD': 'password',
          #全局开启事务,绑定的是http请求响应整个过程
         'ATOMIC_REQUESTS': True, 
     

 如果你全局开启了事务,你仍然可以使用non_atomic_requests装饰器让某些视图方法不受事务控制,如下所示:

from django.db import transaction

# 不受全局事务控制 
@transaction.non_atomic_requests
def my_view(request):
    do_stuff()
 
 
# 如有多个数据库,让使用otherdb的视图不受事务控制
@transaction.non_atomic_requests(using='otherdb')
def my_other_view(request):
    do_stuff_on_the_other_database()

局部开启事务

Django项目中局部开启事务,可以借助于transaction.atomic方法。

使用它我们就可以创建一个具备原子性的代码块,一旦代码块正常运行完毕,所有的修改会被提交到数据库。

反之,如果有异常,更改会被回滚。

视图函数方式开启事务

from django.db import transaction


@transaction.atomic
def t4(request):
    # This code executes inside a transaction.
    pass

基于类视图开启事务

from django.db import transaction
 from rest_framework.views import APIView
 
 
 class OrderAPIView(APIView):
       # 开启事务,当方法执行完以后,自动提交事务
       @transaction.atomic  
       def post(self, request):
           pass 

只对视图方法里一小段代码使用事务

from django.db import transaction
 
 
 def viewfunc(request):

       
     # 显式地开启事务, 默认自动提交
     with transaction.atomic():
         # 下面这段代码在事务中执行
         pass

Savepoint回滚 

在事务操作中,我们还会经常显式地设置保存点(savepoint)。

一旦发生异常或错误,我们使用savepoint_rollback方法让程序回滚到指定的保存点。

如果没有问题,就使用savepoint_commit方法提交事务。

def t4(request):
    sid = transaction.savepoint()
    with transaction.atomic():
        try:
            models.Text_one.objects.select_for_update().filter(id=1).first()
        except Exception as e:
            # 如发生异常,回滚到指定地方。
            transaction.savepoint_rollback(sid)
        # 如果没有异常,显式地提交一次事务
        transaction.savepoint_commit(sid)

事务提交后回调函数 

有的时候我们希望当前事务提交后立即执行额外的任务,比如客户下订单后立即邮件通知卖家,这时可以使用Django提供的on_commit方法,如下所示:

 案例一

def do_something():
    print('当事务提交后,我被执行')


def t4(request):
    sid = transaction.savepoint()
    with transaction.atomic():
        try:
            models.Text_one.objects.select_for_update().filter(id=1).first()
        except Exception as e:
            # 如发生异常,回滚到指定地方。
            transaction.savepoint_rollback(sid)
        # 如果没有异常,显式地提交一次事务
        transaction.savepoint_commit(sid)
        transaction.on_commit(do_something)

案例二(调用celery) 

# 例2:调用celery异步任务
 transaction.on_commit(lambda: some_celery_task.delay('arg1'))

悲观锁与乐观锁思想的实现

django实现悲观锁

Django中使用悲观锁锁定一个对象,需要使用select_for_update()方法。它本质是一个行级锁,能锁定所有匹配的行(如果查询所有,可以锁住整张表),直到事务结束。

注意:MySQL版本要在8.0.1+ 以上才支持 nowait,skip_locked和of选项。

案例1:类视图,锁定id=10的SKU对象

 class OrderView(APIView):
 
 
     @transaction.atomic
     def post(self, request):
         # select_for_update表示锁,只有获取到锁才会执行查询,否则阻塞等待。
         sku = GoodsSKU.objects.select_for_update().get(id=10)
 
 
         # 等事务提交后,会自动释放锁。
         return Response("xxx")
 

 其他案例

class OrderCommitView(View):
  """悲观锁"""
  # 开启事务装饰器,会自动提交事务,但本段代码,手动提交
  @transaction.atomic
  def post(self,request):
    """订单并发 ———— 悲观锁"""
    # 拿到商品id
    goods_ids = request.POST.getlist('goods_ids')
  
    # 校验参数
    if len(goods_ids) == 0 :
      return JsonResponse('res':0,'errmsg':'数据不完整') 
    # 当前时间字符串
    now_str = datetime.now().strftime('%Y%m%d%H%M%S') 
    # 订单编号
    order_id = now_str + str(request.user.id)
    # 支付方式
    pay_method = request.POST.get('pay_method')
    # 地址
    address_id = request.POST.get('address_id')
    try:
      address = Address.objects.get(id=address_id)
    except Address.DoesNotExist:
      return JsonResponse('res':1,'errmsg':'地址错误') 
    # 商品数量
    total_count = 0
    # 商品总价
    total_amount = 0
     # 获取redis连接
    conn = get_redis_connection('default')
    # 拼接key
    cart_key = 'cart_%d' % request.user.id 
    #
    # 创建保存点
    sid = transaction.savepoint() 
    order_info = OrderInfo.objects.create(
      order_id = order_id,
      user = request.user,
      addr = address,
      pay_method = pay_method,
      total_count = total_count,
      total_price = total_amount
    ) 
    for goods_id in goods_ids:
      # 尝试查询商品
      # 此处考虑订单并发问题,
      try:
        # goods = Goods.objects.get(id=goods_id) # 不加锁查询
        goods = Goods.objects.select_for_update().get(id=goods_id) # 加互斥锁查询
      except Goodsgoods.DoesNotExist:
        # 回滚到保存点
        transaction.rollback(sid)
        return JsonResponse('res':2,'errmsg':'商品信息错误')
      # 取出商品数量
      count = conn.hget(cart_key,goods_id)
      if count is None:
        # 回滚到保存点
        transaction.rollback(sid)
        return JsonResponse('res':3,'errmsg':'商品不在购物车中') 
      count = int(count) 
      if goods.stock < count:
        # 回滚到保存点
        transaction.rollback(sid)
        return JsonResponse('res':4,'errmsg':'库存不足') 
      # 商品销量增加
      goods.sales += count
      # 商品库存减少
      goods.stock -= count
      # 保存到数据库
      goods.save() 
      OrderGoods.objects.create(
        order = order_info,
        goods = goods,
        count = count,
        price = goods.price
      ) 
      # 累加商品件数
      total_count += count
      # 累加商品总价
      total_amount += (goods.price) * count 
    # 更新订单信息中的商品总件数
    order_info.total_count = total_count
    # 更新订单信息中的总价格
    order_info.total_price = total_amount + order_info.transit_price
    order_info.save()
  
    # 事务提交
    transaction.commit() 
    return JsonResponse('res':5,'errmsg':'订单创建成功')

 案例2:函数视图,锁定所有符合条件的商品对象列表。

def t2(request):
    # 创建保存点
    # # 开启事务,当方法执行完以后,自动提交事务
    with transaction.atomic():
        sid = transaction.savepoint()
        print('我打印几次')
        try:
            # 获取商品ig
            goods_id = request.GET.get('id')
            # 获取需要购买的商品数量
            count = int(request.GET.get('count'))
            # 原始库存 将符合的记录锁住
            old_data = models.Text_one.objects.select_for_update().filter(id=goods_id).first()
            # 获取原始库存
            origin_stock = old_data.count
            print('原始库存:', origin_stock)
            # 判断商品库存是否充足
            if origin_stock < count:
                return HttpResponse(content="商品库存不足")
            # 演示多个用户并发请求,因为postman不是并发的所以sleep一下
            sleep(5)
            # 减少商品的库存数量,保存到数据库
            old_data.count = origin_stock - count
            print('最终库存:', old_data.count)
            old_data.save()
        except Exception:
            transaction.savepoint_rollback(sid)
        # 显示
        transaction.savepoint_commit(sid)
        transaction.on_commit(do_something)
    return HttpResponse('ok2')

同时使用select_for_update与select_related

当你同时使用select_for_update与select_related方法时,select_related指定的相关对象也会被锁定。

你可以通过select_for_update(of=(...))方法指定需要锁定的关联对象,如下所示:

注意:MySQL版本要在8.0.1+ 以上才支持 nowait,skip_locked和of选项。

# 只会锁定entry(self)和category,不会锁定作者author
 entries = Entry.objects.select_related('author', 'category'). select_for_update(of=('self', 'category'))

Django实现乐观锁 

介绍1:

乐观锁实现一般使用记录版本号,为数据表增加一个版本标识(version)字段,每次对数据的更新操作成功后都对版本号执行+1操作。

每次执行更新操作时都去判断当前版本号是不是该条数据的最新版本号,如果不是说明数据已经同时被修改过了,则丢弃更新,需要重新获取目标对象再进行更新。

Django项目中实现乐观锁可以借助于django-concurrency这个第三方库, 它可以给模型增加一个version字段,每次执行save操作时会自动给版本号+1。

from django.db import models
 from concurrency.fields import IntegerVersionField
 
 
 class ConcurrentModel( models.Model ):
     version = IntegerVersionField( )
     name = models.CharField(max_length=100)

 介绍2

相比悲观锁,乐观锁其实并不能称为是锁,那么它是在做什么事情呢。

其实是在你要进行数据库操作时先去查询一次数据库中商品的库存,然后在你要更新数据库中商品库存时,将你一开始查询到的库存数量和商品的ID一起作为更新的条件,当受影响行数返回为0时,说明没有修改成功,那么就是说别的进程修改了该数据,那么你就可以回滚到之前没有进行数据库操作的时候,重新查询,重复之前的操作一定次数,如果超过你设置的次数还是不能修改那么就直接返回错误结果。

该方法只适用于订单并发较少的情况,如果失败次数过多,会带给用户不良体验,同时适用该方法要注意数据库的隔离级别一定要设置为Read Committed 。

 案例一:

class OrderCommitView(View):
  """乐观锁"""
  # 开启事务装饰器
  @transaction.atomic
  def post(self,request):
    """订单并发 ———— 乐观锁"""
    # 拿到id
    goods_ids = request.POST.get('goods_ids')
     
    if len(goods_ids) == 0 :
      return JsonResponse('res':0,'errmsg':'数据不完整')
    # 当前时间字符串
    now_str = datetime.now().strftime('%Y%m%d%H%M%S')
    # 订单编号
    order_id = now_str + str(request.user.id)
    # 支付方式
    pay_method = request.POST.get('pay_method')
    # 地址
    address_id = request.POST.get('address_id')
    try:
      address = Address.objects.get(id=address_id)
    except Address.DoesNotExist:
      return JsonResponse('res':1,'errmsg':'地址错误') 
    # 商品数量
    total_count = 0
    # 商品总价
    total_amount = 0
    # 订单运费
    transit_price = 10
    # 创建保存点
    sid = transaction.savepoint() 
    order_info = OrderInfo.objects.create(
      order_id = order_id,
      user = request.user,
      addr = address,
      pay_method = pay_method,
      total_count = total_count,
      total_price = total_amount,
      transit_price = transit_price
    )
    # 获取redis连接
    goods = get_redis_goodsection('default')
    # 拼接key
    cart_key = 'cart_%d' % request.user.id
  
    for goods_id in goods_ids:
      # 尝试查询商品
      # 此处考虑订单并发问题, 
      # redis中取出商品数量
      count = goods.hget(cart_key, goods_id)
      if count is None:
        # 回滚到保存点
        transaction.savepoint_rollback(sid)
        return JsonResponse('res': 3, 'errmsg': '商品不在购物车中')
      count = int(count) 
      for i in range(3):
        # 若存在订单并发则尝试下单三次
        try:
  
          goods = Goodsgoods.objects.get(id=goods_id) # 不加锁查询
          # goods = Goodsgoods.objects.select_for_update().get(id=goods_id) # 加互斥锁查询
        except Goodsgoods.DoesNotExist:
          # 回滚到保存点
          transaction.savepoint_rollback(sid)
          return JsonResponse('res':2,'errmsg':'商品信息错误') 
        origin_stock = goods.stock
        print(origin_stock, 'stock')
        print(goods.id, 'id') 
        if origin_stock < count: 
          # 回滚到保存点
          transaction.savepoint_rollback(sid)
          return JsonResponse('res':4,'errmsg':'库存不足') 
  
        # # 商品销量增加
        # goods.sales += count
        # # 商品库存减少
        # goods.stock -= count
        # # 保存到数据库
        # goods.save() 
        # 如果下单成功后的库存
        new_stock = goods.stock - count
        new_sales = goods.sales + count
        res = Goodsgoods.objects.filter(stock=origin_stock,id=goods_id).update(stock=new_stock,sales=new_sales)
        print(res)
        if res == 0:
          if i == 2:
            # 回滚
            transaction.savepoint_rollback(sid)
            return JsonResponse('res':5,'errmsg':'下单失败')
          continue
        else:
          break
      OrderGoods.objects.create(
        order = order_info,
        goods = goods,
        count = count,
        price = goods.price
      ) 
      # 删除购物车中记录
      goods.hdel(cart_key,goods_id)
      # 累加商品件数
      total_count += count
      # 累加商品总价
      total_amount += (goods.price) * count 
    # 更新订单信息中的商品总件数
    order_info.total_count = total_count
    # 更新订单信息中的总价格
    order_info.total_price = total_amount + order_info.transit_price
    order_info.save() 
    # 事务提交
    transaction.savepoint_commit(sid) 
    return JsonResponse('res':6,'errmsg':'订单创建成功')

 案例二:

--这里通过比较要修改的字段stock是否与取出时的数据一致
	--若一致,直接更新数据即可
	--若不一致,重新取出stock数据进行更新

import time
from django.db import transaction
from django.shortcuts import render
from django.http import HttpResponse
from rest_framework.generics import GenericAPIView
from .models import GoodsInfo


class Goods(GenericAPIView):
    """ 购买商品 """

    @transaction.atomic
    def post(self, request):
        # 获取请求头中查询字符串数据
        goods_id = request.GET.get('goods_id')
        count = int(request.GET.get('count'))

        while True:
            # 查询商品对象 -- 最基本查询
            goods = GoodsInfo.objects.filter(id=goods_id).first()

            # 获取原始库存
            origin_stock = goods.stock
            print(origin_stock)

            # 判断商品库存是否充足
            if origin_stock < count:
                return HttpResponse(content="商品库存不足", status=400)

            # 演示多个用户并发请求,因为postman不是并发的所以sleep一下
            time.sleep(5)

            # 减少商品的库存数量,保存到数据库
            result = GoodsInfo.objects.filter(id=goods_id, stock=origin_stock).update(stock=origin_stock - count)
            if result == 0:
                # 表示更新失败,有人抢先购买了商品,重新获取库存信息,判断库存
                continue
            break
        return HttpResponse(content="操作成功", status=200)

文章参考

(12条消息) 04、django高并发连接数据库的并发控制问题 和 简单代码测试_鞍-的博客-CSDN博客_django并发控制https://blog.csdn.net/weixin_41097516/article/details/113483346?utm_source=app&app_version=5.2.1&code=app_1562916241&uLinkId=usr1mkqgl919blen(14条消息) django orm实现乐观锁_pushiqiang的博客-CSDN博客_django 乐观锁https://blog.csdn.net/pushiqiang/article/details/115272965(14条消息) django中解决并发问题,悲观锁、乐观锁和任务队列的解决方案_python小白努力中的博客-CSDN博客_django 并发锁https://blog.csdn.net/lh_hebine/article/details/99696918Django进阶:事务操作、悲观锁和乐观锁(附代码演示) - 知乎 (zhihu.com)https://zhuanlan.zhihu.com/p/372957129Django的乐观锁与悲观锁实现 - shengguorui - 博客园 (cnblogs.com)https://www.cnblogs.com/shengguorui/p/11342963.html(23条消息) 上个厕所的功夫,搞懂MySQL事务隔离级别__陈哈哈的博客-CSDN博客https://blog.csdn.net/qq_39390545/article/details/107343711(26条消息) 面试让HR都能听懂的MySQL锁机制,欢声笑语中搞懂MySQL锁__陈哈哈的博客-CSDN博客https://blog.csdn.net/qq_39390545/article/details/115864190

(26条消息) 隐式事务,显式事务,自动提交事务_土戈的博客-CSDN博客_隐式事务https://blog.csdn.net/f110300641/article/details/83988364 (26条消息) Django(11): 事务 transaction.atomic_python开发笔记的博客-CSDN博客_@transaction.atomichttps://blog.csdn.net/qq_37674086/article/details/113583194

以上是关于mysql在django中开启事务,实现悲观锁和乐观锁的主要内容,如果未能解决你的问题,请参考以下文章

mysql悲观锁是行锁还是表锁?

悲观锁与乐观锁

Mysql 悲观锁

Django 2021年最新版教程32Django 事务 悲观锁 乐观锁

14.mysql悲观锁

MySql悲观锁总结与实践