你的python 可重入分布式锁是怎么实现的

Posted 唱起儿歌写代码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了你的python 可重入分布式锁是怎么实现的相关的知识,希望对你有一定的参考价值。

基本概念


  • 分布式锁


分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本文主要介绍基于redis方式。


  • 可重入


可重入锁指的是同一线程中可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,如果没有可重入锁的支持,在第二次尝试获得锁时将会进入死锁状态。

可重入锁的要点是对于同一线程可以多次获取锁,不同线程之间同一把锁不能重复获取,因此理论上需要保存锁拥有者的线程标识。在Java和python中都可以用 threadlocal变量同来同步同一线程之间的资源。


分布式锁设计要点


一个完整的分布式锁,需要满足以下几个条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。

  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。这就要求必须为锁加上超时时间。

  • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。这点是要求必须能够保证相关操作的原子性。

  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。这就要求每把锁必须有自己的标识,在解锁时通过判断标识来保证是释放的自己的锁。


代码实现


Talk is cheap, show me the code。下面是一个分布式锁的基本实现:

 
   
   
 
class DistributedLock(object):
   """
  This is not a 100% reliable distributed lock implementation. It is a try best one.
  And this distributed lock algorithm is based on the time synchronization between
  each node.
  But it should be enough for our project.
  """

   def __init__(selflock_nameexpire_time=3):
       self.name = lock_name + '_distributed_lock'
       self.expire_time = expire_time
       self.uuid = str(uuid.uuid4())

   @redis_exc
   def __enter__(self):
       """
      If driver is not redis, nothing will happen.
      """
       redis_conn = driver_manager.get_global_redis_conn()
       while True:
           if redis_conn.set(self.nameself.uuidex=self.expire_time*2nx=True) == 1:
               LOG.debug('Get lock %s' % self.name)
               break
           LOG.debug('Sleep 0.1s to try to get lock %s' % self.name)
           time.sleep(0.1)
       return self

   @redis_exc
   def __exit__(selfexc_typeexc_valexc_tb):
       redis_conn = driver_manager.get_global_redis_conn()
       owner_id = redis_conn.get(self.name)
       if owner_id == self.uuid:
           redis_conn.delete(self.name)
           LOG.debug('Release lock %s' % self.name)
       else:
           LOG.warning('Transaction locked by distributed lock %s need larger expire time' % self.name)
       return


可以看到,我们加锁就一行代码:redis_conn.set(self.name, self.uuid, ex=self.expire_time, nx=True),这个set()方法一共有五个形参:

  • 第一个为key,表示锁的唯一性标识。
  • 第二个为value,我们传的是uuid,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为uuid,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。
  • 第三个表示锁的过期时间,如果程序发生意外没有及时解锁,也能保证在超时后其他请求能够正常加锁
  • 第四个nx=True,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

总的来说,执行上面的set()方法就只会导致两种结果:1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。2. 已有锁存在,不做任何操作。

心细的童鞋就会发现了,我们的加锁代码满足我们可靠性里描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为uuid,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验。

上面分布式锁解锁阶段分了两步,先获取锁的owner_id,然后根据owner_id进行解锁判断,实际上,一般推荐的都是将两步操作原子化,使用LUA脚本进行执行,此处暂时未对此做相应优化。


分布式锁支持可重入

上面介绍了一个完整的基于redis的分布式锁python实现版本和思路,在实际场景中,对同一个线程或者协程的同一把锁需要支持可重入,如递归调用时不发生死锁,下面实现版本在上述的分布式锁中进行了相关代码修改。

_request_store = threading.local()


def _add_locker_info(lock_nameowner_uuid):
   setattr(_request_storelock_nameowner_uuid)


def _get_locker_info(lock_name):
   return getattr(_request_storelock_nameNone)


class DistributedLock(object):
   """
  This is not a 100% reliable distributed lock implementation. It is a try best one.
  And this distributed lock algorithm is based on the time synchronization between
  each node.
  But it should be enough for our project.
  """

   def __init__(selflock_nameexpire_time=3):
       self.name = lock_name + '_distributed_lock'
       self.expire_time = expire_time
       self.uuid = str(uuid.uuid4())

   @redis_exc
   def __enter__(self):
       """
      If driver is not redis, nothing will happen.
      """
       redis_conn = driver_manager.get_global_redis_conn()
       while True:
           if redis_conn.set(self.nameself.uuidex=self.expire_time*2nx=True) == 1:
               _add_locker_info(self.nameself.uuid)
               LOG.debug('Get lock %s' % self.name)
               break
           if _get_locker_info(self.name):
               LOG.debug('Get re-entrant lock %s' % self.name)
               break
           LOG.debug('Sleep 0.1s to try to get lock %s' % self.name)
           time.sleep(0.1)
       return self

   @redis_exc
   def __exit__(selfexc_typeexc_valexc_tb):
       redis_conn = driver_manager.get_global_redis_conn()
       owner_id = redis_conn.get(self.name)
       if owner_id == self.uuid:
           redis_conn.delete(self.name)
           LOG.debug('Release lock %s' % self.name)
       elif owner_id == _get_locker_info(self.name):
           LOG.debug('Quit from re-entrant lock %s, do nothing ' % self.name)
       else:
           LOG.warning('Transaction locked by distributed lock %s need larger expire time' % self.name)
       return


python 的threadlocal变量和Java中类似,对同一个线程中的变量进行隔离,因此对可重入锁的支持只需要判断下当前线程中是否已经有这把锁,如果有,则直接返回,否则进行等待锁释放或者超时。

注意点


  • threadlocal中的内存不主动释放,会不会内存泄漏?
不会,因为每个线程对应一个副本字典,当在该线程中 获取、设置、删除 ThreadLocal对象的属性时,会先通过_patch函数,将线程的副本字典,设置成ThreadLocal对象的属性字典,然后对ThreadLocal对象的属性的操作,实际上就是对副本字典的操作。当线程结束,线程对象被销毁时,副本字典的引用计数也会随之减少,最后被GC掉。

  • 可重入锁的counter计数
   一般的设计中都会对可重入锁维护一个counter计数,也就是在counter小于等于0时进行释放,由于考虑到自增自减操作的非原子性,没有通过counter值进行锁释放,而是采用谁一开始创建锁,就最后删除锁。可重入锁的多次重入表现为堆栈,后入先出,也就是说有一开始加锁的对象进行锁删除即可。

  • 关于threadlocal的线程隔离
很多情况下都是在同一个进程中起不同的协程去处理API请求,也就是说不同的API请求是有可能在同一个线程中的,实际测试也验证了这一点。如下打印的日志,总共有三次API请求,但是线程号是一样的。

如果从线程出发,threadlocal的表现应该是在同一个线程内数据共享,如果没有主动删除数据,则在线程消亡时释放内存。由于在加锁和解锁时并没有主动删除threadlocal中的数据,也就是说同一个线程中的数据应该是会累加的,但是实际上没有,表现的行为是协程隔离,也就是说两个API请求虽然处于同一个线程内,但是并没有共享threadlocal的数据。

threadlocal协程隔离

通过定位后发现在项目的eventlet模块中对通过monkey_patch方式对原生的threading.local进行了改写,在原生的threading.local 通过如下的函数获取当前的线程号
 
   
   
 
def currentThread():
   """Return the current Thread object, corresponding to the caller's thread of control.

  If the caller's thread of control was not created through the threading
  module, a dummy thread object with limited functionality is returned.

  """
   try:
       return _active[_get_ident()]
   except KeyError:
       ##print "current_thread(): no current thread for", _get_ident()
       return _DummyThread()
# 获取线程号函数        
_get_ident = thread.get_ident

在eventlet库中对该函数进行了改写,函数currentThread获取到的是协程号,如下:
 
   
   
 
def current_thread():
   g = greenlet.getcurrent()
   if not g:
       # Not currently in a greenthread, fall back to standard function
       return _fixup_thread(__orig_threading.current_thread())

   try:
       active = __threadlocal.active
   except AttributeError:
       active = __threadlocal.active = {}

   try:
       t = active[id(g)]
   except KeyError:
       # Add green thread to active if we can clean it up on exit
       def cleanup(g):
           del active[id(g)]
       try:
           g.link(cleanup)
       except AttributeError:
           # Not a GreenThread type, so there's no way to hook into
           # the green thread exiting. Fall back to the standard
           # function then.
           t = _fixup_thread(__orig_threading.currentThread())
       else:
           t = active[id(g)] = _GreenThread(g)

   return t

最终调用的如下greenlet库中的getcurrent获取当前协程标识
 
   
   
 
greenlet.getcurrent()
  Returns the current greenlet (i.ethe one which called this function).
  返回当前greenlet,也就是谁在调用这个函数。
相关patch函数在/neutron/common/eventlet_utils.py。

附录:python threading.RLock源码


_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
Lock = _allocate_lock

def RLock(*args**kwargs):
   """Factory function that returns a new reentrant lock.

  A reentrant lock must be released by the thread that acquired it. Once a
  thread has acquired a reentrant lock, the same thread may acquire it again
  without blocking; the thread must release it once for each time it has
  acquired it.

  """
   return _RLock(*args**kwargs)

class _RLock(_Verbose):
   """A reentrant lock must be released by the thread that acquired it. Once a
      thread has acquired a reentrant lock, the same thread may acquire it
      again without blocking; the thread must release it once for each time it
      has acquired it.
  """

   def __init__(selfverbose=None):
       _Verbose.__init__(selfverbose)
       self.__block = _allocate_lock()
       self.__owner = None
       self.__count = 0

   def __repr__(self):
       owner = self.__owner
       try:
           owner = _active[owner].name
       except KeyError:
           pass
       return "<%s owner=%r count=%d>" % (
               self.__class__.__name__ownerself.__count)

   def acquire(selfblocking=1):
       """Acquire a lock, blocking or non-blocking.

      When invoked without arguments: if this thread already owns the lock,
      increment the recursion level by one, and return immediately. Otherwise,
      if another thread owns the lock, block until the lock is unlocked. Once
      the lock is unlocked (not owned by any thread), then grab ownership, set
      the recursion level to one, and return. If more than one thread is
      blocked waiting until the lock is unlocked, only one at a time will be
      able to grab ownership of the lock. There is no return value in this
      case.

      When invoked with the blocking argument set to true, do the same thing
      as when called without arguments, and return true.

      When invoked with the blocking argument set to false, do not block. If a
      call without an argument would block, return false immediately;
      otherwise, do the same thing as when called without arguments, and
      return true.

      """
       me = _get_ident()
       if self.__owner == me:
           self.__count = self.__count + 1
           if __debug__:
               self._note("%s.acquire(%s): recursive success"selfblocking)
           return 1
       rc = self.__block.acquire(blocking)
       if rc:
           self.__owner = me
           self.__count = 1
           if __debug__:
               self._note("%s.acquire(%s): initial success"selfblocking)
       else:
           if __debug__:
               self._note("%s.acquire(%s): failure"selfblocking)
       return rc

   __enter__ = acquire

   def release(self):
       """Release a lock, decrementing the recursion level.

      If after the decrement it is zero, reset the lock to unlocked (not owned
      by any thread), and if any other threads are blocked waiting for the
      lock to become unlocked, allow exactly one of them to proceed. If after
      the decrement the recursion level is still nonzero, the lock remains
      locked and owned by the calling thread.

      Only call this method when the calling thread owns the lock. A
      RuntimeError is raised if this method is called when the lock is
      unlocked.

      There is no return value.

      """
       if self.__owner !_get_ident():
           raise RuntimeError("cannot release un-acquired lock")
       self.__count = count = self.__count - 1
       if not count:
           self.__owner = None
           self.__block.release()
           if __debug__:
               self._note("%s.release(): final release"self)
       else:
           if __debug__:
               self._note("%s.release(): non-final release"self)

   def __exit__(selftvtb):
       self.release()

   # Internal methods used by condition variables

   def _acquire_restore(selfcount_owner):
       countowner = count_owner
       self.__block.acquire()
       self.__count = count
       self.__owner = owner
       if __debug__:
           self._note("%s._acquire_restore()"self)

   def _release_save(self):
       if __debug__:
           self._note("%s._release_save()"self)
       count = self.__count
       self.__count = 0
       owner = self.__owner
       self.__owner = None
       self.__block.release()
       return (countowner)

   def _is_owned(self):
       return self.__owner == _get_ident()


  • 简单分析
python源码中的可重入锁相对简单,加锁部分,第一次获取锁时保存锁的线程标识,后续再次获取锁,先看是否是同一个线程,如果是的话只对锁计数进行递增。解锁时,对锁计数进行递减,如果计数为0,最终释放锁。

需要注意的是需要先初始化类,后续加锁解锁都应使用同一个对象,而不能每次加锁和解锁使用新的RLock对象,这样会导致owner信息和count信息没有初始化。这个问题本质上是线程变量同步的问题。

参考文档

  • https://segmentfault.com/a/1190000021199037
  • https://github.com/yangfeixxx/python_redis
  • https://leaveslm.github.io/2018/08/08/2018-2018-08-08- 基于-Redis-实现可重入分布式锁/
  • https://www.cnblogs.com/xybaby/p/6420873.html
  • https://www.cnblogs.com/linjiqin/p/8003838.html


【您的在看,我的莫大鼓励】


以上是关于你的python 可重入分布式锁是怎么实现的的主要内容,如果未能解决你的问题,请参考以下文章

6.23Java多线程可重入锁实现原理

可重入锁浅谈

偏向锁跟可重入性有什么区别

Java并发编程:可重入内置锁

多线程之synchronoized实现可重入锁

转:Java并发编程之一:可重入内置锁