spring schedule的线程与重入问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring schedule的线程与重入问题相关的知识,希望对你有一定的参考价值。

参考技术A 先说说@EnableScheduling这个注解 , 之前一直疑惑有的地方为什么没有@EnableScheduling这个注解也可以正常运行,其实这个注解相当于一个全局开关 , 让该项目可以扫描到@Schedule方法 , 所以在项目某一处用到了@EnableScheduling , 其他地方也可以作用到。

定时任务参数fixDelay的间隔为第一次任务的结束和第二次任务的开始 , 如果是参数fixRate , 间隔则为两次任务的开始。参数fixRate下,如果程序执行时间大于设置的间隔时间,则会在结束时立马执行下一次任务.

值得注意的,spring schedule默认为单线程,且不会被重入,也就是说一个@EnableScheduling类里同一时间只有一个定时任务可以启动,如果要同时执行两个定时任务怎么办?

解决办法:

可以看到 , 设置了线程池之后可以变成多线程执行 , 但是此时依旧是非重入的(也就是某个方法执行完后才会再次被执行)

AQS与重入锁ReetrantLock原理

一、AQS原理

AQS(AbstractQueuedSynchronizer)队列同步器是用来构建锁、同步组件的基础框架。

AQS内部通过一个volatile int类型的成员变量state控制同步状态【0代表锁未被占用,1表示已占用】,通过内部类Node构成FIFO的同步队列实现等待获取锁的线程排队工作,通过内部类ConditionObject构建条件等待队列,来完成等待条件线程的排队工作。当线程调用Condition对象的wait方法后会被加入等待队列中,当有线程调用Condition的signal方法后,线程将从等待队列移动到同步队列进行锁竞争。AQS内部会有一个同步队列和可能多个等待队列,前者存放等待获取锁的线程,后者分别存放等待不同条件的线程。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatile Node head;

//指向同步的队尾
private transient volatile Node tail;

//同步状态,0代表锁未被占用,1代表锁已被占用
private volatile int state;

static final class Node {
    //共享模式
    static final Node SHARED = new Node();
    //独占模式
    static final Node EXCLUSIVE = null;

    //标识线程已处于结束状态
    static final int CANCELLED =  1;
    //等待被唤醒状态
    static final int SIGNAL    = -1;
    //条件状态,
    static final int CONDITION = -2;
    //在共享模式中使用表示获得的同步状态会被传播
    static final int PROPAGATE = -3;

    //等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
    volatile int waitStatus;

    //同步队列中前驱结点
    volatile Node prev;

    //同步队列中后继结点
    volatile Node next;

    //请求锁的线程
    volatile Thread thread;

    //等待队列中的后继结点,这个与Condition有关,稍后会分析
    Node nextWaiter;

    //判断是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //获取前驱结点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    //.....
}
//上面Node head、tail对象构建同步队列,这里用ConditionObject类的对象创建条件等待队列
class ConditionObject implements Condition, java.io.Serializable { //等待队列第一个等待结点 private transient Node firstWaiter; //等待队列最后一个等待结点 private transient Node lastWaiter; //省略其他代码....... } //AQS中的模板方法,由其子类实现 //独占模式下获取锁的方法 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //独占模式下解锁的方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享模式下获取锁的方法 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //共享模式下解锁的方法 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //判断是否为持有独占锁 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } }

二、AQS的应用

AQS通过state状态管理、同步队列、等待队列实现了多线程同步锁获取与释放,多线程并发排队、条件等待等复杂功能。

作为基础组件,它对锁的两种模式【独占模式和共享模式】都提供支持。设计上AQS采用模板方法模式构建,其内部提供了并发操作的核心方法、而将一些实现不同模式下实现可能有差异的操作定义为模板方法,让其子类实现。如ReentrantLock通过内部类Sync及其子类继承AQS实现tryAcuire()和tryRelease()方法来实现独占锁,而SemaPhore则通过内部类继承AQS实现tryAcquireShared()方法和tryReleaseShared()方法实现共享模式锁。AQS的继承关系图如下:

技术分享图片

三、ReetrantLock非公平锁实现分析AQS的用法

ReetrantLock中非公平锁
//加锁操作
public void lock() {
     sync.lock();
}

/**
 * 非公平锁实现sync.lock()
 */
static final class NonfairSync extends Sync {
    //加锁
    final void lock() {
        //执行CAS操作,获取同步状态
        if (compareAndSetState(0, 1))
       //成功则将独占锁线程设置为当前线程  
          setExclusiveOwnerThread(Thread.currentThread());
        else
            //否则再次请求同步状态
            acquire(1);
    }
}

//acquire为AQS本身实现的方法,其实现如下:
public final void acquire(int arg) {
    //再次尝试获取同步状态
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


//AQS子类Sync的子类NonfairSync中实现的tryAcquire方法
    protected final boolean tryAcquire(int acquires) {
         return nonfairTryAcquire(acquires);
     }

  //nonfairTryAcquire方法
  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      //判断同步状态是否为0,并尝试再次获取同步状态
      if (c == 0) {
          //执行CAS操作
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      //如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          //设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc);
          setState(nextc);
          return true;
      }
      return false;
  }
 
//AQS本身实现的方法,将当前获取锁失败的线程构造成node结点加入的同步队列尾部,若队列为空或者并发入队失败,则调用enq方法重试。
private Node addWaiter(Node mode) {
    //将请求同步状态失败的线程封装成结点
    Node node = new Node(Thread.currentThread(), mode);

    Node pred = tail;
    //如果是第一个结点加入肯定为空,跳过。
    //如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加
    if (pred != null) {
        node.prev = pred;
        //使用CAS执行尾部结点替换,尝试在尾部快速添加
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果第一次加入或者CAS操作没有成功执行enq入队操作
    enq(node);
    return node;
}
//AQS本身实现方法,通过循环CAS操作将当前线程构造的node结点入队,解决上面队列为空或者是并发入队失败的情况;
private Node enq(final Node node) {
    //死循环
    for (;;) {
         Node t = tail;
         //如果队列为null,即没有头结点
         if (t == null) { // Must initialize
             //创建并使用CAS设置头结点
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {//队尾添加新结点
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
    }
//AQS本身方法,队列中结点循环观察,当自己的前驱是head结点执行的结点时尝试获取锁,若成功将其设置为头结点,否则循环尝试;若当前结点前驱结点不是头结点,则在设置其前驱结点状态后将自己挂起
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋,死循环
        for (;;) {
            //获取前驱结点
            final Node p = node.predecessor();
            当且仅当p为头结点才尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                //将node设置为头结点
                setHead(node);
                //清空原来头结点的引用便于GC
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果前驱结点不是head,判断是否挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //最终都没能获取同步状态,结束该线程的请求
            cancelAcquire(node);
    }
}
//设置为头结点
private void setHead(Node node) {
        head = node;
        //清空结点数据
        node.thread = null;
        node.prev = null;
}
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

      interrupted = true;
}

//AQS本身的方法,若前驱结点状态为Node.SIGNAL则返回true,表示可以挂起当前结点,否则找到非结束状态的前驱结点,并设置其状态后,返回false
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前结点的等待状态
        int ws = pred.waitStatus;
        //如果为等待唤醒(SIGNAL)状态则返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果ws>0 则说明是结束状态,
        //遍历前驱结点直到找到没有结束状态的结点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果ws小于0又不是SIGNAL状态,
            //则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
//AQS本身方法,挂起当前线程并检查其中断状态
private final boolean parkAndCheckInterrupt() {
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        //并非中断线程,因此可能true也可能false,并返回
        return Thread.interrupted();
}

//ReentrantLock类的unlock
public void unlock() {
    sync.release(1);
}

//AQS类的release()方法
public final boolean release(int arg) {
    //尝试释放锁
    if (tryRelease(arg)) {

        Node h = head;
        if (h != null && h.waitStatus != 0)
            //唤醒后继结点的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//ReentrantLock类中的内部类Sync实现的tryRelease(int releases) 
protected final boolean tryRelease(int releases) {

      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
          throw new IllegalMonitorStateException();
      boolean free = false;
      //判断状态是否为0,如果是则说明已释放同步状态
      if (c == 0) {
          free = true;
          //设置Owner为null
          setExclusiveOwnerThread(null);
      }
      //设置更新同步状态
      setState(c);
      return free;
  }
//AQS本身方法,唤醒后续挂起的结点
private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

 

 剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理

以上是关于spring schedule的线程与重入问题的主要内容,如果未能解决你的问题,请参考以下文章

Spring使用@Scheduled注解配置定时任务

Spring Boot 定时任务单线程和多线程

spring task的定时任务突然断了

Spring4+Springmvc+quartz实现多线程动态定时调度

spring定时任务Scheduled与定时任务线程池配置SchedulingConfigurer ,Java

Spring Schedule定时任务多线程执行任务配置