深入HotSpot虚拟机源码探究synchronized底层实现原理万字总结synchronized

Posted 漫话人生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入HotSpot虚拟机源码探究synchronized底层实现原理万字总结synchronized相关的知识,希望对你有一定的参考价值。

文章目录


一、synchronized原理

synchronized是Java中的关键字,无法通过JDK源码查看它的实现,它是由JVM提供支持的,所以如果想要了解具体的实现需要查看JVM源码

(1)首先准备好HotSpot源码

jdk8 hotspot源码下载地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/

选择zip或者gz格式下载即可

(2)解压,使用vscode或者其他编辑器打开


src是hotspot的源码目录

cpu:和cpu相关的一些操作
os:在不同操作系统上的一些区别操作
os_cpu:关联os和cpu的实现
share:公共代码

  • tools:一些工具类
  • vm:公共源码

(3)初始monitor监视器锁(先了解后细说)

相信都对下面几行代码非常熟悉,如果不熟悉synchronized的底层的话,可能会直接认为这个锁是依赖Object对象的。这当然是无稽之谈!

        Object lock = new Object();
        synchronized (lock) 
            
        

其实无论是synchronized代码块还是synchronized方法,其锁的实现最终依赖monitor监视器(先记住这个概念后面细说);那么你是否头上有个大大的问号,那么这个Object对象有什么用呢

其实这要从对象头中的MarkWord说起了(这里我长话短说,后面细说);每个Java对象在内存中包含了三部分数据对象头、实例数据、对齐填充

对象头:包含了markword(状态标志位)和类元信息指针、数组长度(如果对象是数组则多这一项)
实例数据:存放具体的实例变量数据
对齐填充:JVM要求Java对象分配内存必须是8的倍数(不满足8的倍数时填充一些字节)

重点在markword ! ! ! ! !

这个markword的状态是动态变化的(节省空间),分为四种状态-无锁、偏向锁、轻量级锁、重量级锁;某一时刻Object的状态只能处于其中一种,这应该没什么疑问吧。这个动态变化涉及到了锁优化(锁升级、锁粗化、锁消除),这个概念先了解,后面细说!!!

重点来了 !!!!
Monitor被翻译成"监视器",可以理解为实现同步的一种工具,通常被描述为一个对象,Java中每个对象都关联着一把“看不见的锁”,,为什么?看完这段描述你就明白了!当我们使用synchronized给对象上锁之后(注意这里假设认为是重量级锁),该对象中的markword字段是处于重量级锁状态,然后它会被设置指向Monitor对象的指针(Monitor由C++实现)

这个monitor不是我们创建的,而是JVM执行到同步代码块时创建的,monitor里面有两个重要的变量,分别是owner(占有锁的线程),recursions(线程获取锁的次数)

(4)建立宏观概念(初始基本流程)

打开HotSpot源码文件src/share/vm/runtime/ObjectMonitor.hpp

在hotspot中,monitor是由ObjectMonitor对象来实现的,找到该对象对应的构造器

首先描述一下这个核心流程(只看核心部分)

Owner:持有monitor的线程,对应上面源码中的_object变量
WaitSet:处于等待状态的线程会被放到该队列(例如调用wait()方法),对应上面源码中的_WaitSet变量
EntryList:当多个线程竞争锁时,竞争锁失败的线程会被放入到该队列,处于阻塞状态,需要唤醒。对应上面源码中的_EntryList变量

(5)分析锁竞争源码

	synchronized (lock) 
    	num++;
	

上面的同步代码经过反编译之后得到如下字节码指令

想必都听说过monitorenter和monitorexit指令,一个表示获取监视器锁,一个表示释放监视器锁;

关于锁竞争的JVM源码,最终会调用到MonitorObject类中的enter()方法

void ATTR ObjectMonitor::enter(TRAPS) 
  Thread * const Self = THREAD ;
  void * cur ;
//通过CAS操作尝试将_owner变量设置为当前线程,如果_owner为NULL表示锁未被占用
//CAS:内存值、预期值、新值,只有当内存值==预期值,才能将新值替换内存值
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  if (cur == NULL)  //如果未NULL,表示获取锁成功,直接返回即可
     assert (_recursions == 0   , "invariant") ;
     assert (_owner      == Self, "invariant") ;
     return ;
  
//线程重入,synchronized的可重入特性原理,_owner保存的线程与当前正在执行的线程相同,将_recursions++
  if (cur == Self)  
     _recursions ++ ;
     return ;
  
//表示线程第一次进入monitor,则进行一些设置
  if (Self->is_lock_owned ((address)cur)) 
    assert (_recursions == 0, "internal state error");
    _recursions = 1 ;  //锁的次数设置为1
    _owner = Self ;  //将_owner设置为当前线程
    OwnerIsThread = 1 ;  
    return ;
  
  .....
  .....省略
  .....
    //获取锁失败
    for (;;) 
      jt->set_suspend_equivalent();
    //等待锁的释放
      EnterI (THREAD) ;

      if (!ExitSuspendEquivalent(jt)) break ;

      _recursions = 0 ;
      _succ = NULL ;
      exit (false, Self) ;

      jt->java_suspend_self();
    
    Self->set_current_pending_monitor(NULL);
  

总结下来,也就是四步骤

Ⅰ、通过CAS尝试将_owner变量设置为当前线程
Ⅱ、如果是线程重入(下面有举例),则将_recurisons++
Ⅲ、如果线程是第一次进入,则将_recurisons设置为1,将_owner设置为当前线程,该线程获取锁成功并返回
Ⅳ、如果获取锁失败,则等待锁的释放

  synchronized (lock) 
     num++;
     synchronized (lock)   //锁重入_recurisons+1
                        
      
  

(6)分析锁等待源码

在锁竞争源码中最后一步,如果获取锁失败,则等待锁的释放,由MonitorObject类中的EnterI()方法来实现

void ATTR ObjectMonitor::EnterI (TRAPS) 
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;
  //再次尝试获取锁,获取成功直接返回
    if (TryLock (Self) > 0) 
		....
        return ;
    

    DeferredInitialize () ;


  //尝试自旋获取锁,获取锁成功直接返回
    if (TrySpin (Self) > 0) 
        ....
        return ;
    

  //前面的尝试都失败,则将该线程信息封装到node节点
    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

  
    ObjectWaiter * nxt ;
    //将node节点插入到_cxq的头部,前面说过锁获取失败的线程首先会进入_cxq
    //_cxq是一个单链表,等到一轮过去在该_cxq列表中的线程还未成功获取锁,
    //则进入_EntryList列表
    for (;;)                   //注意这里的死循环操作
        node._next = nxt = _cxq ;
        //这里插入节点时也使用了CAS,因为可能有多个线程失败将加入_cxq链表
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;


        //如果线程CAS插入_cxq链表失败,它会再抢救一下看看能不能获取到锁
        if (TryLock (Self) > 0) 
            ...
            return ;
        
    


    //竞争减弱时,将该线程设置为_Responsible(负责线程),定时轮询_owner
    //后面该线程会调用定时的park方法,防止死锁
    if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) 
        Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    


    TEVENT (Inflated enter - Contention) ;
    int nWakeups = 0 ;
    int RecheckInterval = 1 ;
    //前面获取锁失败的线程已经放入到了_cxq列表,但还未挂起
    //下面是将_cxq列表挂起的代码,线程一旦挂起,必须唤醒之后才能继续操作
    for (;;) 
        //挂起之前,再次尝试获取锁,看看能不能成功,成功则跳出循环
        if (TryLock (Self) > 0) break ;
        assert (_owner != Self, "invariant") ;

        if ((SyncFlags & 2) && _Responsible == NULL) 
           Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
        
        //将当前线程挂起(park()方法)
        // park self
        //如果当前线程是_Responsible线程,则调用定时的park方法,防止死锁
        if (_Responsible == Self || (SyncFlags & 1)) 
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            // Increase the RecheckInterval, but clamp the value.
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
         else 
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ;
        
        //当线程被唤醒之后,会再次尝试获取锁
        if (TryLock(Self) > 0) break ;
		//唤醒锁之后,还出现竞争,记录唤醒次数,这里的计数器
		//并没有受锁的保护,也没有原子更新,为了获取更低的探究影响
        TEVENT (Inflated enter - Futile wakeup) ;
        if (ObjectMonitor::_sync_FutileWakeups != NULL) 
           ObjectMonitor::_sync_FutileWakeups->inc() ;
        
        ++ nWakeups ; //唤醒次数

        //自旋尝试获取锁
        if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;


        if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) 
           Self->_ParkEvent->reset() ;
           OrderAccess::fence() ;
        
        if (_succ == Self) _succ = NULL ;

        // Invariant: after clearing _succ a thread *must* retry _owner before parking.
        OrderAccess::fence() ;
    

    //已经获取到了锁,将当前节点从_EntryList队列中删除
    UnlinkAfterAcquire (Self, &node) ;
    if (_succ == Self) _succ = NULL ;

  	...
    return ;


总结下来也就一下几步:

  • 首先tryLock再次尝试获取锁,之后再CAS尝试获取锁;失败后将当前线程信息封装成ObjectWaiter对象。
  • 在for(;;)循环中,通过CAS将该节点插入到_cxq链表的头部(这个时刻可能有多个获取锁失败的线程要插入),CAS插入失败的线程再次尝试获取锁
  • 如果还没获取到锁,则将线程挂起;等待唤醒
  • 当线程被唤醒时,再次尝试获取锁

我能从这个源码设计理念中学到什么?

看完这个锁等待源码,你是不是有了一个疑问,为什么使用了多次tryLock尝试获取锁和CAS获取锁?源码中无限推迟了线程的挂起操作,你可以看到从开始到线程挂起的代码中,出现了多次的尝试获取锁;因为线程的挂起与唤醒涉及到了状态的转换(内核态和用户态),这种频繁的切换必定会给系统带来性能上的瓶颈。所以它的设计意图就是尽量推辞线程的挂起时间,取一个极限的时间挂起线程。
另外源码中定义了负责线程_Responsible,这种标识的线程调用的是定时的park(线程挂起),避免死锁

你永远也不知道在某个时刻你全部的线程会不会同时挂起,所以最好的解决办法就是:设计一种Responsible负责线程,让它一直活跃或者定时醒来。

(7)分析锁释放源码

void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) 
   Thread * Self = THREAD ; 
   if (THREAD != _owner)   //判断当前线程是否是线程持有者
    //当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀后还没调用过enter方法,_owner会是指向Lock Record的指针
     if (THREAD->is_lock_owned((address) _owner)) 

       assert (_recursions == 0, "invariant") ;
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
      else   //当前线程不是锁的持有者--》出现异常
       TEVENT (Exit - Throw IMSX) ;
       assert(false, "Non-balanced monitor enter/exit!");
       if (false) 
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       
       return;
     
   
  //重入,计数器-1,返回
   if (_recursions != 0) 
     _recursions--;        // this is simple recursive enter
     TEVENT (Inflated exit - recursive) ;
     return ;
   

   //_Responsible设置为NULL
   if ((SyncFlags & 4) == 0) 
      _Responsible = NULL ;
   

#if INCLUDE_JFR
   if (not_suspended && EventJavaMonitorEnter::is_enabled()) 
    _previous_owner_tid = JFR_THREAD_ID(Self);
   
#endif

   for (;;) 
      assert (THREAD == _owner, "invariant") ;


      if (Knob_ExitPolicy == 0) 
       
         //先释放锁,这时如果有其他线程获取锁,则能获取到
         OrderAccess::release_store_ptr (&_owner, NULL) ;   // drop the lock
         OrderAccess::storeload() ;                         // See if we need to wake a successor
         //等待队列为空,或者有"醒着的线程”,则不需要去等待队列唤醒线程了,直接返回即可
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) 
            TEVENT (Inflated exit - simple egress) ;
            return ;
         
         TEVENT (Inflated exit - complex egress) ;

 		 //当前线程重新获取锁,因为后序要唤醒队列
 		 //一旦获取失败,说明有线程获取到锁了,直接返回即可,不需要获取锁再去唤醒线程了
         if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) 
            return ;
         
         TEVENT (Exit - Reacquired) ;
       else 
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) 
            OrderAccess::release_store_ptr (&_owner, NULL) ;   // drop the lock
            OrderAccess::storeload() ;
            // Ratify the previously observed values.
            if (_cxq == NULL || _succ != NULL) 
                TEVENT (Inflated exit - simple egress) ;
                return ;
            
             //当前线程重新获取锁,因为后序要唤醒队列
              //一旦获取失败,说明有线程获取到锁了,直接返回即可,不需要获取锁再去唤醒线程了
            if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) 
               TEVENT (Inflated exit - reacquired succeeded) ;
               return ;
            
            TEVENT (Inflated exit - reacquired failed) ;
          else 
            TEVENT (Inflated exit - complex egress) ;
         
      

      guarantee (_owner == THREAD, "invariant") ;

      ObjectWaiter * w = NULL ;
      int QMode = Knob_QMode ;   //根据QMode的不同,会有不同的唤醒策略

      if (QMode == 2 && _cxq != NULL) 
        //QMode==2,_cxq中有优先级更高的线程,直接唤醒_cxq的队首线程
		.........
          return ;
      
      //当QMode=3的时候 讲_cxq中的数据加入到_EntryList尾部中来 然后从_EntryList开始获取
      if (QMode == 3 && _cxq != NULL)    
          .....
          
		....... //省略
		.......
      //当QMode=4的时候 讲_cxq中的数据加入到_EntryList前面来 然后从_EntryList开始获取
      if (QMode == 4 && _cxq != NULL) 
			......
          
          
        //批量修改状态标志改成TS_ENTER
          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) 
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          
        //插到原有的_EntryList前面 从员_EntryList中获取
          // Prepend the RATs to the EntryList
          if (_EntryList != NULL) 
              q->_next = _EntryList ;
              _EntryList->_prev = q ;
          
          _EntryList = w ;

      
	..........
	..........省略
   


核心流程如下:

1.将_recursions减1,_owner置空
2.如果队列中等待的线程为空或者_succ不为空(有"醒着的线程”,则不需要取唤醒线程了),直接返回即可。
3.第二条不满足,当前线程重新获取锁,去唤醒线程
4.唤醒线程,根据QMode的不同,有不同的唤醒策略

QMode = 2且cxq非空:cxq中有优先级更高的线程,直接唤醒_cxq的队首线程
QMode = 3且cxq非空:把cxq队列插入到EntryList的尾部;
QMode = 4且cxq非空:把cxq队列插入到EntryList的头部;
QMode = 0:暂时什么都不做,继续往下看;

只有QMode=2的时候会提前返回,等于0、3、4的时候都会继续往下执行:

我能从这个源码设计理念中学到什么?

首先在它的锁释放源码中,首先就将锁释放,然后再去判断是否有醒着的线程;如果不满足再让该线程重新获取锁去唤醒线程。如何理解它设计理念的精髓之处,首先先将锁释放,因为可能有线程正在尝试或者自旋获取锁,然后 TODO:


(8)为什么synchronized是重量级锁?

从上面的ObjectMonitor类中的函数调用设计到

以上是关于深入HotSpot虚拟机源码探究synchronized底层实现原理万字总结synchronized的主要内容,如果未能解决你的问题,请参考以下文章

《深入理解Java虚拟机》读后笔记-HotSpot虚拟机对象探秘

深入解析JVM源码 - 创建HotSpot

深入理解java虚拟机HotSpot虚拟机对象解析

深入理解java虚拟机HotSpot虚拟机对象解析

jvm,深入理解java虚拟机,HotSpot虚拟机对象探秘

新书速递深入理解Java虚拟机HotSpot