实现声明式锁,支持分布式锁自定义锁SpEL和结合事务

Posted 我从二院来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实现声明式锁,支持分布式锁自定义锁SpEL和结合事务相关的知识,希望对你有一定的参考价值。

工作中遇到事务一般使用声明式事务,一个注解@Transactional搞定。编程式事务则显得略繁琐。

	@Autowired
    private PlatformTransactionManager transactionManager;

    public void service() throws Exception 
        TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
        try 
            Thread.sleep(1000);
         catch (Exception e) 
            transactionManager.rollback(status);
            throw e;
        
        transactionManager.commit(status);
    

但是遇到需要加锁的情况呢?
绝大多数情况都是使用编程式锁。例如:

	ReentrantLock lock = new ReentrantLock();
    public void service() throws Exception 
        try 
            lock.lock();
            Thread.sleep(1000);
         finally 
            lock.unlock();
        
    

但为什么不搞个轮子把编程式锁变为声明式锁呢?
本文尝试着写一个,欢迎大家指正。


2.实现

众所周知,声明式事务使用注解+AOP,同理声明式锁也应该一样。那么先搞一个注解。

2.1 定义注解

@Target(ElementType.TYPE, ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Locked 

    /**
     * 锁名称,如果不传会默认使用同一把锁
     * 支持SpEL表达式
     * @return
     */
    String key() default "";


    /**
     * 锁类型
     * @return
     */
    LockType lockType() default LockType.ReentrantLock;

    /**
     * 获取锁的时间,超出时间没有获取到锁返回false
     * 默认为0
     * -1 为永不超时,这种情况下,tryLock()会一直阻塞到获取锁
     * @return
     */
    long time() default 0;

    /**
     * 获取锁的时间单位
     * 默认为秒
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

再定义一个枚举类型的锁类型

public enum LockType 
     ReentrantLock,
     RedissonClient,
     ELSE


2.2 定义锁接口

然后仿照java.util.concurrent.locks.Lock定义一个锁的接口。

public interface LockedService 
   boolean lock();

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unLock();

    /**
     * 如果是redisson分布式锁,调用方通过此方法将相应的redissonClient对象和key传到具体实现类
     * @param redissonClient
     * @param key
     * @return
     */
    LockedService setRedissonClient(RedissonClient redissonClient, String key);

    /**
     * 如果是ReentrantLock,调用方通过此方法将相应的lock对象传到具体实现类
     * @param lock
     * @return
     */
    LockedService setLock(Lock lock);

主要是加锁解锁的动作。加锁分为阻塞与非阻塞,有时间参数与非时间参数之分。
与jdk的 lock接口完全相似。
里面多余的方法分别为:

  1. setRedissonClient()
    如果是redisson分布式锁,调用方通过此方法将相应的redissonClient对象和key传到具体实现类
  2. setLock()
    如果是ReentrantLock,调用方通过此方法将相应的lock对象传到具体实现类

如果是自定义锁,在自定义的实现类里面可以忽略这两个方法,自定义获取锁对象。

2.3 锁的实现

锁的接口至少需要两个实现类,一个是ReentrantLock,另一个是RedissonClient
如果是直接定义两个类,

@Component
public class LockedServiceImpl implements LockedService
@Component
public class LockedRedissonServiceImpl implements LockedService

然后在切面里直接使用

 @Autowired
 private LockedService lockedService;

启动就会报错:

Field lockedService in com.nyp.test.service.LockAspect required a single bean, but 2 were found:
	- lockedRedissonServiceImpl: defined in file [\\target\\classes\\com\\nyp\\test\\service\\LockedRedissonServiceImpl.class]
	- lockedServiceImpl: defined in file [\\target\\classes\\com\\nyp\\test\\service\\LockedServiceImpl.class]

就算通过@Primary或者@Qualifier将这两个实现类都注入了进来,也不好分辨究竟使用哪一个。

这个使用就需要用到SPI了。

2.3.1 什么是SPI

SPI全称Service Provider Interface,是Java提供的一套用来被第三方实现或者扩展的API,它可以用来启用框架扩展和替换组件。

有点类似于接口+策略模式+配置文件组合实现的动态加载机制。

比如spring mvc/springboot里面有个HttpMessageConverters,HTTP的request和response的转换器。当一个请求完成时,返回一个对象,需要它将对象转换成json串(或其它),然后以流的形式写到客户端。这个的工具有很多,比如jackson,gson等,spring默认采用jackson框架,AbstractJackson2HttpMessageConverter.

也很多同学实际上使用的是fastjson,FastJsonHttpMessageConverter。或者其它。

不管使用哪一种框架,它都要去实现HttpMessageConverters接口。但springboot容器加载的时候怎么知道需要去加载哪些实现类,具体又使用哪个实现类呢。

这里就使用到了SPI机制。

2.3.2 通过SPI实现锁的多个实现类

SPI有jdk的实现,有spring boot的实现。这里使用springboot的实现方法。

resources/META-INF目录下新装置文件 spring.factories,内容为锁接口及其实现类的全限定类名。

com.nyp.test.service.LockedService=\\
com.nyp.test.service.LockedServiceImpl,\\
com.nyp.test.service.LockedRedissonServiceImpl

同时在类LockedServiceImplLockedRedissonServiceImpl就不需要加@Component注解。
LockedServiceImpl类:

import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * @projectName: Test
 * @package: com.nyp.test.service
 * @className: com.nyp.test.service.LockedService
 * @author: nyp
 * @description: jdk ReentrantLock锁实现
 * @date: 2023/4/13 11:45
 * @version: 1.0
 */
@Component
public class LockedServiceImpl implements LockedService

    private Lock lock;

    @Override
    public boolean lock()
        if (lock != null) 
            lock.lock();
            return true;
        
        return false;
    

    @Override
    public boolean tryLock() 
        return lock.tryLock();
    

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return lock.tryLock(time, unit);
    

    @Override
    public void unLock()
        if (lock != null) 
            lock.unlock();
        
    

    @Override
    public LockedService setRedissonClient(RedissonClient redissonClient, String key) 
        return this;
    

    @Override
    public LockedService setLock(Lock lock) 
        this.lock = lock;
        return this;
    


LockedRedissonServiceImpl类:

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * @projectName: Test
 * @package: com.nyp.test.service
 * @className: com.nyp.test.service.LockedService
 * @author: nyp
 * @description: redisson分布式锁实现
 * @date: 2023/4/13 11:45
 * @version: 1.0
 */
public class LockedRedissonServiceImpl implements LockedService 
    private RedissonClient redissonClient;
    private String key;

    @Override
    public boolean lock() 
        RLock rLock = redissonClient.getLock(key);
        if (rLock != null) 
            rLock.lock();
            return true;
        
        return false;
    

    @Override
    public boolean tryLock() 
        RLock rLock = redissonClient.getLock(key);
        return rLock.tryLock();
    

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        RLock rLock = redissonClient.getLock(key);
        return rLock.tryLock(time, unit);
    

    @Override
    public void unLock() 
        RLock rLock = redissonClient.getLock(key);
        if (rLock != null && rLock.isHeldByCurrentThread()) 
            rLock.unlock();
        
    

    @Override
    public LockedService setRedissonClient(RedissonClient redissonClient, String key) 
        this.redissonClient = redissonClient;
        this.key = key;
        return this;
    

    @Override
    public LockedService setLock(Lock lock) 
        return this;
    

那么spring容器里面就有两个锁接口实现了,到底使用哪一个呢?
模仿HttpMessageConverters搞一个转换器。在init()方法里面指定默认使用com.nyp.test.service.LockedServiceImpl.

public class LockedServiceConverters 
    private LockedService lockedService;
    private String lockedServiceImplClass;

    public LockedServiceConverters(String lockedServiceImplClass) 
        this.lockedServiceImplClass = lockedServiceImplClass;
    

    public LockedService getLockedService() 
        return lockedService;
    

    @PostConstruct
    public void init() throws Exception 
        if (StringUtils.isBlank(lockedServiceImplClass)) 
            lockedServiceImplClass = "com.nyp.test.service.LockedServiceImpl";
        

        List<LockedService> lockedServiceList = SpringFactoriesLoader.loadFactories(LockedService.class, null);
        for (LockedService lockedService : lockedServiceList)
            System.out.println(lockedService.getClass().getName());
            if(lockedService.getClass().getName().equals(lockedServiceImplClass))
                this.lockedService = lockedService;
            
        
        if (lockedService == null) 
            throw new Exception("未发现lockedService : " + lockedServiceImplClass);
        
    

使用的时候可以通过以下方式指定使用LockedRedissonServiceImpl,或其它自定义的锁实现类。

@Configuration
public class WebConfig 
    @Bean
    public LockedServiceConverters lockedServiceConverters() 
        return new LockedServiceConverters("com.nyp.test.service.LockedRedissonServiceImpl");
    

2.3.3 通过SPI自定义实现锁

首先实现一个LockedService接口。

public class LockedServiceUserDefine implements LockedService
    @Override
    public boolean lock() 
        log.info("锁住");
        return true;
    

    @Override
    public boolean tryLock() 
        return true;
    

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return true;
    

    @Override
    public void unLock() 
        log.info("解锁");
    

    @Override
    public LockedService setRedissonClient(RedissonClient redissonClient, String key) 
        return this;
    

    @Override
    public LockedService setLock(Lock lock) 
        return this;
    

然后将此类加到spring.factory里面去

com.nyp.test.service.LockedService=\\
com.nyp.test.service.LockedServiceImpl,\\
com.nyp.test.service.LockedRedissonServiceImpl,\\
com.nyp.test.service.LockedServiceUserDefine

再将LockedServiceConverters改为LockedServiceUserDefine即可。

@Bean
    public LockedServiceConverters lockedServiceConverters() 
        return new LockedServiceConverters("com.nyp.test.service.LockedServiceUserDefine");
    

3.定义切面

3.1 切面实现

import com.nyp.test.config.LockType;
import com.nyp.test.config.Locked;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @projectName: Test
 * @package: com.nyp.test.service
 * @className: LockAspect
 * @author: nyp
 * @description: TODO
 * @date: 2023/4/13 10:46
 * @version: 1.0
 */
@Aspect
@Component
@Slf4j
public class LockAspect 

    /**
     * 用于保存ReentrantLock类的锁
     */
    private volatile ConcurrentHashMap<String, Lock> locks = new ConcurrentHashMap<>();

    @Autowired
    private LockedServiceConverters lockedServiceConverters;

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 用于SpEL表达式解析.
     */
    private SpelExpressionParser parser = new SpelExpressionParser();
    /**
     * 用于获取方法参数定义名字.
     */
    private DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();

    @Around("@annotation(locked)")
    public Object around(ProceedingJoinPoint joinPoint, Locked locked) throws Throwable 
        String lockKey = getKeyBySpEL(locked.key(), joinPoint);
        if (StringUtils.isBlank(lockKey)) 
            lockKey = joinPoint.getTarget().toString();
            log.info("lockKey = ", lockKey);
        
        Lock lock = locks.get(lockKey);
        lock = getLockFromLocks(lock, locked.lockType(), lockKey);
        boolean isLock;
        if (locked.time() != 0) 
            if (locked.time() < 0) 
                isLock = lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).tryLock();
             else 
                isLock = lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).tryLock(locked.time(), locked.timeUnit());
            
         else 
            isLock = lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).lock();
        
        Object obj;
        try 
            obj = joinPoint.proceed();
         catch (Throwable throwable) 
            throw  throwable;
         finally 
            if(isLock)
                // 如果有事务,保证事务完成(commit or rollback)过后再释放锁
                // 这里不管声明式事务,还是编程式事务,只要还没完成就会进入
				// TODO 这里要考虑到事务的传播,特别是嵌套事务
                if(TransactionSynchronizationManager.isActualTransactionActive())
                    Lock finalLock = lock;
                    String finalLockKey = lockKey;
                    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() 
                        @Override
                        public void afterCompletion(int status) 
                            lockedServiceConverters.getLockedService().setLock(finalLock).setRedissonClient(redissonClient, finalLockKey).unLock();
                        
                    );
                 else
                    lockedServiceConverters.getLockedService().setLock(lock).setRedissonClient(redissonClient, lockKey).unLock();
                
            
        
        return obj;
    

    private Lock getLockFromLocks(Lock lock, LockType lockType, String lockKey) 
        // TODO 非Lock类锁可以不调用此方法
        if (lock == null) 
            synchronized (LockAspect.class)
                lock = locks.get(lockKey);
                if (lock == null) 
                    if (lockType == LockType.ReentrantLock) 
                        lock = new ReentrantLock();
                        locks.put(lockKey, lock);
                    
                
            
        
        return lock;
    

    public String getKeyBySpEL(String expressionStr, ProceedingJoinPoint joinPoint) 
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        String[] paramNames = discoverer.getParameterNames(methodSignature.getMethod());
        Expression expression = parser.parseExpression(expressionStr);
        EvaluationContext context = new StandardEvaluationContext();
        Object[] args = joinPoint.getArgs();
        for (int i = 0; i < args.length; i++) 
            context.setVariable(paramNames[i], args[i]);
        
        return expression.getValue(context) == null ? "" : expression.getValue(context).toString();
    




1.其中locks用于保存ReentrantLock类的锁,每个key一个锁对象,如果key为空,默认以方法名为key,意味着同一个方法共用一个锁对象。

2.lockedServiceConverters为锁转换器,通过它可以获取容器当中使用的真正锁对象。
3.redissonClient对象在这里注入,通过lockedServiceConverters获取的锁实现类将此锁对象注入到实现方法内部。
4.SpelExpressionParserDefaultParameterNameDiscoverer对象用于通过SpEL表达式动态从目标方法参数中获取锁的key。

5.@Around("@annotation(locked)")这里使用一个环绕通知,拦截加了locked注解的方法,进行锁操作。

具体执行流程为,拦截到方法:
1.先获取到lockkey,然后根据lockkey获取锁对象。
2.根据lockedServiceConverters拿到具体的锁实现类对象,根据锁对象类型以及time等参数,将需要的redissonClient,lock等参数传入,再调用锁方法实现,进行加锁操作。
3.调用目标方法。
4.目标方法调用完毕,进行解锁操作。这里判断一下是否还在一个事务当中,如果是的话,在事务完成之后再进行解锁。这块在后面再详细说明。

3.2 SpEL表达式获取动态key

定义一个测试目标方法。key为#person.name

@Transactional(rollbackFor = Exception.class)
@Locked(key = "#person.name", lockType = LockType.RedissonClient, time = 10, timeUnit = TimeUnit.MILLISECONDS)
    public void service(Person person) throws Exception 
        Thread.sleep(1000);
        log.info("业务方法");
    

再调用

Person person = new Person();
person.setName("张三");
lockTest.service(person);

准确获取到了张三

3.3 锁与事务的结合

锁与事务结合为什么要单独拿出来讲?有什么问题吗?

具体可以看我的另一篇博文 https://juejin.cn/post/7213636024112234551 当transcational遇上synchronized。
不管是synchronized还是Lock。道理是一样的。

简单说,spring使用动态代理加AOP实现事务管理。那么上面的方法实际上至少需要简化成3个步骤:

void begin();

@Transactional(rollbackFor = Exception.class)
public void service(Person person) throws Exception 
  try
  	lock.lock();
   finally
  	lock.unlock();
  


void commit();
// void rollback();

如果在service()中向数据库insert/update一条数据,在serive()执行完毕锁释放之后,commit之前,有另一个线程拿到了锁开始执行service()方法,那么这时候它是读不到数据库里最新的记录的。除非事务隔离级别为读未提交。

但实际生产环境中,少有人使用读未提交这种隔离级别,为了避免上述的线程安全问题,就得借助事务同步器TransactionSynchronization来实现。当线程中存在未完成的事务时,需要在afterCompletion方法里释放锁。afterCompletion表示事务完成,包括提交与回滚。

4.测试


4.1 ReentrantLock测试

	@Bean
    public LockedServiceConverters lockedServiceConverters() 
        return new LockedServiceConverters("com.nyp.test.service.LockedServiceImpl");
    

业务方法阻塞,锁获取超时时间为60秒。

	@Transactional(rollbackFor = Exception.class)
    @Locked(key = "#person.name", lockType = LockType.ReentrantLock, time = 60000, timeUnit = TimeUnit.MILLISECONDS)
    public void service(Person person) throws Exception 
        Thread.sleep(10000000);
        log.info("业务方法");
    

先测试张三锁,再测试李四锁,再获取李四锁
预期结果是张三获取到锁,李四因为是新的key所以也能获取锁,李四再次获取锁,因为之前李四获取的锁还没释放,所以一直阻塞获取不了锁。

执行日志:

[2023-04-18 19:02:17.639] INFO 53268 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : key = 张三
[2023-04-18 19:02:17.640] INFO 53268 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : 张三 尝试获取锁 
[2023-04-18 19:02:17.641] INFO 53268 [http-nio-8657-exec-1] [com.nyp.test.service.LockedServiceImpl] : 张三 获取到了锁
[2023-04-18 19:02:20.280] INFO 53268 [http-nio-8657-exec-4] [com.nyp.test.service.LockAspect] : key = 李四
[2023-04-18 19:02:20.281] INFO 53268 [http-nio-8657-exec-4] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 
[2023-04-18 19:02:20.281] INFO 53268 [http-nio-8657-exec-4] [com.nyp.test.service.LockedServiceImpl] : 李四 获取到了锁
[2023-04-18 19:02:22.181] INFO 53268 [http-nio-8657-exec-3] [com.nyp.test.service.LockAspect] : key = 李四
[2023-04-18 19:02:22.181] INFO 53268 [http-nio-8657-exec-3] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 
[2023-04-18 19:03:22.186] INFO 53268 [http-nio-8657-exec-3] [com.nyp.test.service.LockedServiceImpl] : 李四 获取锁超时

执行日志符合预期,李四第2次尝试获取锁,在60秒后超时失败。

4.2 RedissonClient测试


@Bean
    public LockedServiceConverters lockedServiceConverters() 
        return new LockedServiceConverters("com.nyp.test.service.LockedRedissonServiceImpl");
    

还是上面的测试代码,结果符合预期。

[2023-04-18 19:10:47.895] INFO 117888 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : key = 张三
[2023-04-18 19:10:47.896] INFO 117888 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : 张三 尝试获取锁 
[2023-04-18 19:10:47.904] INFO 117888 [http-nio-8657-exec-1] [com.nyp.test.service.LockedRedissonServiceImpl] : 张三 获取到了锁
[2023-04-18 19:10:50.555] INFO 117888 [http-nio-8657-exec-5] [com.nyp.test.service.LockAspect] : key = 李四
[2023-04-18 19:10:50.556] INFO 117888 [http-nio-8657-exec-5] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 
[2023-04-18 19:10:50.557] INFO 117888 [http-nio-8657-exec-5] [com.nyp.test.service.LockedRedissonServiceImpl] : 李四 获取到了锁
[2023-04-18 19:10:55.882] INFO 117888 [http-nio-8657-exec-8] [com.nyp.test.service.LockAspect] : key = 李四
[2023-04-18 19:10:55.883] INFO 117888 [http-nio-8657-exec-8] [com.nyp.test.service.LockAspect] : 李四 尝试获取锁 
[2023-04-18 19:11:55.896] INFO 117888 [http-nio-8657-exec-8] [com.nyp.test.service.LockedRedissonServiceImpl] : 李四 获取锁超时

4.3 自定义锁测试

使用LockedServiceUserDefine

    @Bean
    public LockedServiceConverters lockedServiceConverters() 
        return new LockedServiceConverters("com.nyp.test.service.LockedServiceUserDefine");
    
    @Transactional(rollbackFor = Exception.class)
    @Locked(key = "#person.name", lockType = LockType.ELSE, time = 60000, timeUnit = TimeUnit.MILLISECONDS)
    public void service(Person person) throws Exception 
        log.info("业务方法");
    

日志:

[2023-04-18 19:19:38.897] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : key = 张三
[2023-04-18 19:19:38.898] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockAspect] : 张三 尝试获取锁 
[2023-04-18 19:19:38.898] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockedServiceUserDefine] : 锁住
[2023-04-18 19:19:38.900] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockTest] : 业务方法
[2023-04-18 19:19:38.901] INFO 6264 [http-nio-8657-exec-1] [com.nyp.test.service.LockedServiceUserDefine] : 解锁  

这里只是证明容器执行了自定义的实现类。没有真正实现锁。

5.尾声


5.1 todo list

1.将locks里不用的锁定期清除。
2.getLockFromLocks()方法非Lock类的锁可不执行。
3.重要:在切面里,在事务同步器afterCompletion后再释放锁,这里要考虑到事务的传播性,特别是嵌套事务的情况。这种情况下,会把锁的范围扩大。

5.2 小结

在这里我们实现了一个声明式的锁,在注解里支持锁的粒度(key和SpEL动态key),支持定义获取锁的超时时间,默认支持ReentrantLockRedissonClient两种锁。

也使用SPI支持自定义锁实现。

支持锁与事务的整合(考虑到事务的传播,或者叫有限支持)。


但是!!没经过严格的测试,特别是并发测试,不建议在生产环境中使用,仅作学习交流之用。
希望能够对各位看官有所启发,欢迎留言讨论。

显式锁Lock的等待通知机制Condition

?? 任意一个Java对象,都拥有一组监视器方法(定义在根类Object上),主要包括:wait( )、wait(long timeout)、notify()、notifyAll()方法;这些方法与关键字synchronized结合使用,可以实现 隐式锁的等待/通知机制。而显示锁Lock也实现了等待/通知机制;Condition接口也提供了类似Object的监视器方法,与Lock配合使用可以实现 显式锁的等待/通知机制,但是两者在使用方式和功能特性有所差别。总得来说,Condition接口更加灵活,功能更多:

  • Condition 接口是支持多个等待队列,也称为条件队列,即根据不同的条件进入不同的等待队列,相应的,用户便可以只在指定的等待队列上唤醒线程,而不是唤醒所有的线程。而 Object监视器 则只能有一个等待队列。
  • Condition 接口还提供了对中断不敏感的等待方法,即当处于等待状态时,是不会被中断的。而Object监视器 不支持不可中断的等待。
  • Condition 接口除了提供了超时等待方法之外,还提供了等待直到将来某个时间段的方法;Object监视器只有超时等待的方法。

Condition 接口的方法:

方法名称 描 述
void await() throws InterruptedException 造成当前线程在接到通知或被中断之前一直处于等待状态。即发生以下两种情况,将会从await()方法返回:
1、其他某个线程调用此 Condition 的 signal()、signalAll()方法;
2、其他某个线程中断当前线程
void awaitUninterruptibly() 造成当前线程在接到通知之前一直处于等待状态。 与await()方法相比,这是不可中断的等待方法
long awaitNanos(long nanosTimeout) throws InterruptedException 造成当前线程在接到通知、被中断或到达指定等待时间之前一直处于等待状态。
返回值:此方法返回时,距离超时的剩余时间,即返回值就是(nanosTimeout - 实际耗时);如果返回值是0或者负数,那么认定已经超时
boolean await(long time, TimeUnit unit) throws InterruptedException 造成当前线程在接到通知、被中断或到达指定等待时间之前一直处于等待状态。
返回值:如果在从此方法返回前检测到等待时间超时(到达指定时间),则返回 false,否则返回 true
此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean awaitUntil(Date deadline) throws InterruptedException 造成当前线程在接到通知、被中断或到达指定最后期限之前一直处于等待状态。
返回值:如果在从此方法返回前检测到等待时间超时(到达指定时间),则返回 false,否则返回 true
void signal() 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
void signalAll() 唤醒所有等待线程。 如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。

@ Example Condition 的例子

public class BoundedQueue<T> {
    private Object[] item;
    //添加的下标、删除的下标、数组的当前数量
    private int addIndex,removeIndex,count;
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
    
    public BoundedQueue(int size){
        item = new Object[size];
    }
    
    //添加一个元素,如果数组满,则添加线程进入等待状态,直到有空位
    public void add(T t) throws InterruptedException{
        lock.lock();
        try{
            while(count == item.length){
                //可中断的等待,如果没有剩余空间,那么就进入notFull的等待队列
                notFull.wait();
            }
            item[addIndex] = t;
            if(++addIndex == item.length){//模拟环数组,一直插入,直到尾部,又重新从头部插入
                addIndex = 0;
            }
            count++;
            //刚插入一个元素,不为空的条件满足,可唤醒等待在notEmpty队列上的线程
            notEmpty.signal();
        }finally {
            lock.unlock();
        }
    }
    
    //从头部移除一个元素,如果数组为空,则等待,直到数组不为空
    public T remove(){
        lock.lock();
        try{
            while(count == 0){
                //不可中断的等待,不为空的条件不满足,就会一直等待
                notEmpty.awaitUninterruptibly();;
            }
            Object x = item[removeIndex];
            if(++removeIndex == item.length){
                removeIndex = 0;
            }
            count--;
            //刚移除一个元素,数组没有满的条件符合,唤醒等待在notFull上的线程
            notFull.signal();
            return (T)x;
        }finally{
            lock.unlock();
        }
    }
}

以上是关于实现声明式锁,支持分布式锁自定义锁SpEL和结合事务的主要内容,如果未能解决你的问题,请参考以下文章

MySQL - 全局锁表级锁行级锁元数据锁自增锁意向锁共享锁独占锁记录锁间隙锁临键锁死锁

事务隔离实现并发控制:MySQL系列之十

显式锁Lock的等待通知机制Condition

Java 隐式锁 - synchronized 关键字

MySQL完全笔记数据库锁篇

分布式锁的实现方式和优缺点&Java代码实现