  • 分布式锁


分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本文主要介绍基于redis方式。

  • 可重入


可重入锁的要点是对于同一线程可以多次获取锁,不同线程之间同一把锁不能重复获取,因此理论上需要保存锁拥有者的线程标识。在Java和python中都可以用 threadlocal变量同来同步同一线程之间的资源。



  • 互斥性。在任意时刻,只有一个客户端能持有锁。

  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。这就要求必须为锁加上超时时间。

  • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。这点是要求必须能够保证相关操作的原子性。

  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。这就要求每把锁必须有自己的标识,在解锁时通过判断标识来保证是释放的自己的锁。


Talk is cheap, show me the code。下面是一个分布式锁的基本实现:

class DistributedLock(object):
  This is not a 100% reliable distributed lock implementation. It is a try best one.
  And this distributed lock algorithm is based on the time synchronization between
  each node.
  But it should be enough for our project.

   def __init__(selflock_nameexpire_time=3):
       self.name = lock_name + '_distributed_lock'
       self.expire_time = expire_time
       self.uuid = str(uuid.uuid4())

   def __enter__(self):
      If driver is not redis, nothing will happen.
       redis_conn = driver_manager.get_global_redis_conn()
       while True:
           if redis_conn.set(self.nameself.uuidex=self.expire_time*2nx=True) == 1:
               LOG.debug('Get lock %s' % self.name)
           LOG.debug('Sleep 0.1s to try to get lock %s' % self.name)
       return self

   def __exit__(selfexc_typeexc_valexc_tb):
       redis_conn = driver_manager.get_global_redis_conn()
       owner_id = redis_conn.get(self.name)
       if owner_id == self.uuid:
           LOG.debug('Release lock %s' % self.name)
           LOG.warning('Transaction locked by distributed lock %s need larger expire time' % self.name)

可以看到,我们加锁就一行代码:redis_conn.set(self.name, self.uuid, ex=self.expire_time, nx=True),这个set()方法一共有五个形参:

  • 第一个为key,表示锁的唯一性标识。
  • 第二个为value,我们传的是uuid,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为uuid,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。
  • 第三个表示锁的过期时间,如果程序发生意外没有及时解锁,也能保证在超时后其他请求能够正常加锁
  • 第四个nx=True,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

总的来说,执行上面的set()方法就只会导致两种结果:1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。2. 已有锁存在,不做任何操作。





_request_store = threading.local()

def _add_locker_info(lock_nameowner_uuid):

def _get_locker_info(lock_name):
   return getattr(_request_storelock_nameNone)

class DistributedLock(object):
  This is not a 100% reliable distributed lock implementation. It is a try best one.
  And this distributed lock algorithm is based on the time synchronization between
  each node.
  But it should be enough for our project.

   def __init__(selflock_nameexpire_time=3):
       self.name = lock_name + '_distributed_lock'
       self.expire_time = expire_time
       self.uuid = str(uuid.uuid4())

   def __enter__(self):
      If driver is not redis, nothing will happen.
       redis_conn = driver_manager.get_global_redis_conn()
       while True:
           if redis_conn.set(self.nameself.uuidex=self.expire_time*2nx=True) == 1:
               LOG.debug('Get lock %s' % self.name)
           if _get_locker_info(self.name):
               LOG.debug('Get re-entrant lock %s' % self.name)
           LOG.debug('Sleep 0.1s to try to get lock %s' % self.name)
       return self

   def __exit__(selfexc_typeexc_valexc_tb):
       redis_conn = driver_manager.get_global_redis_conn()
       owner_id = redis_conn.get(self.name)
       if owner_id == self.uuid:
           LOG.debug('Release lock %s' % self.name)
       elif owner_id == _get_locker_info(self.name):
           LOG.debug('Quit from re-entrant lock %s, do nothing ' % self.name)
           LOG.warning('Transaction locked by distributed lock %s need larger expire time' % self.name)

python 的threadlocal变量和Java中类似,对同一个线程中的变量进行隔离,因此对可重入锁的支持只需要判断下当前线程中是否已经有这把锁,如果有,则直接返回,否则进行等待锁释放或者超时。


  • threadlocal中的内存不主动释放,会不会内存泄漏?
不会,因为每个线程对应一个副本字典,当在该线程中 获取、设置、删除 ThreadLocal对象的属性时,会先通过_patch函数,将线程的副本字典,设置成ThreadLocal对象的属性字典,然后对ThreadLocal对象的属性的操作,实际上就是对副本字典的操作。当线程结束,线程对象被销毁时,副本字典的引用计数也会随之减少,最后被GC掉。

  • 可重入锁的counter计数

  • 关于threadlocal的线程隔离



通过定位后发现在项目的eventlet模块中对通过monkey_patch方式对原生的threading.local进行了改写,在原生的threading.local 通过如下的函数获取当前的线程号
def currentThread():
   """Return the current Thread object, corresponding to the caller's thread of control.

  If the caller's thread of control was not created through the threading
  module, a dummy thread object with limited functionality is returned.

       return _active[_get_ident()]
   except KeyError:
       ##print "current_thread(): no current thread for", _get_ident()
       return _DummyThread()
# 获取线程号函数        
_get_ident = thread.get_ident

def current_thread():
   g = greenlet.getcurrent()
   if not g:
       # Not currently in a greenthread, fall back to standard function
       return _fixup_thread(__orig_threading.current_thread())

       active = __threadlocal.active
   except AttributeError:
       active = __threadlocal.active = {}

       t = active[id(g)]
   except KeyError:
       # Add green thread to active if we can clean it up on exit
       def cleanup(g):
           del active[id(g)]
       except AttributeError:
           # Not a GreenThread type, so there's no way to hook into
           # the green thread exiting. Fall back to the standard
           # function then.
           t = _fixup_thread(__orig_threading.currentThread())
           t = active[id(g)] = _GreenThread(g)

   return t

  Returns the current greenlet (i.ethe one which called this function).

附录:python threading.RLock源码

_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
Lock = _allocate_lock

def RLock(*args**kwargs):
   """Factory function that returns a new reentrant lock.

  A reentrant lock must be released by the thread that acquired it. Once a
  thread has acquired a reentrant lock, the same thread may acquire it again
  without blocking; the thread must release it once for each time it has
  acquired it.

   return _RLock(*args**kwargs)

class _RLock(_Verbose):
   """A reentrant lock must be released by the thread that acquired it. Once a
      thread has acquired a reentrant lock, the same thread may acquire it
      again without blocking; the thread must release it once for each time it
      has acquired it.

   def __init__(selfverbose=None):
       self.__block = _allocate_lock()
       self.__owner = None
       self.__count = 0

   def __repr__(self):
       owner = self.__owner
           owner = _active[owner].name
       except KeyError:
       return "<%s owner=%r count=%d>" % (

   def acquire(selfblocking=1):
       """Acquire a lock, blocking or non-blocking.

      When invoked without arguments: if this thread already owns the lock,
      increment the recursion level by one, and return immediately. Otherwise,
      if another thread owns the lock, block until the lock is unlocked. Once
      the lock is unlocked (not owned by any thread), then grab ownership, set
      the recursion level to one, and return. If more than one thread is
      blocked waiting until the lock is unlocked, only one at a time will be
      able to grab ownership of the lock. There is no return value in this

      When invoked with the blocking argument set to true, do the same thing
      as when called without arguments, and return true.

      When invoked with the blocking argument set to false, do not block. If a
      call without an argument would block, return false immediately;
      otherwise, do the same thing as when called without arguments, and
      return true.

       me = _get_ident()
       if self.__owner == me:
           self.__count = self.__count + 1
           if __debug__:
               self._note("%s.acquire(%s): recursive success"selfblocking)
           return 1
       rc = self.__block.acquire(blocking)
       if rc:
           self.__owner = me
           self.__count = 1
           if __debug__:
               self._note("%s.acquire(%s): initial success"selfblocking)
           if __debug__:
               self._note("%s.acquire(%s): failure"selfblocking)
       return rc

   __enter__ = acquire

   def release(self):
       """Release a lock, decrementing the recursion level.

      If after the decrement it is zero, reset the lock to unlocked (not owned
      by any thread), and if any other threads are blocked waiting for the
      lock to become unlocked, allow exactly one of them to proceed. If after
      the decrement the recursion level is still nonzero, the lock remains
      locked and owned by the calling thread.

      Only call this method when the calling thread owns the lock. A
      RuntimeError is raised if this method is called when the lock is

      There is no return value.

       if self.__owner !_get_ident():
           raise RuntimeError("cannot release un-acquired lock")
       self.__count = count = self.__count - 1
       if not count:
           self.__owner = None
           if __debug__:
               self._note("%s.release(): final release"self)
           if __debug__:
               self._note("%s.release(): non-final release"self)

   def __exit__(selftvtb):

   # Internal methods used by condition variables

   def _acquire_restore(selfcount_owner):
       countowner = count_owner
       self.__count = count
       self.__owner = owner
       if __debug__:

   def _release_save(self):
       if __debug__:
       count = self.__count
       self.__count = 0
       owner = self.__owner
       self.__owner = None
       return (countowner)

   def _is_owned(self):
       return self.__owner == _get_ident()

  • 简单分析



  • https://segmentfault.com/a/1190000021199037
  • https://github.com/yangfeixxx/python_redis
  • https://leaveslm.github.io/2018/08/08/2018-2018-08-08- 基于-Redis-实现可重入分布式锁/
  • https://www.cnblogs.com/xybaby/p/6420873.html
  • https://www.cnblogs.com/linjiqin/p/8003838.html


