spring自带分布式锁-[jdbc-lock-registry]源码解析

Posted 水中加点糖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring自带分布式锁-[jdbc-lock-registry]源码解析相关的知识,希望对你有一定的参考价值。

在查看spring cloud task源码的时候发现它是通过使用spring自带的分布式锁实现的。

分布式锁一直以来都是分布式系统中很重要的一种技术,最常见的主要还是基于redis或zookeeper实现的分布式锁。

spring自带的常用分布式锁介绍文档如下:

对于并发不是很高,并对性能追求不是很迫切,又不想引入其他第三方组件的情况下,使用数据库来实现分布式锁也是一种很不错的实现方式。

spring作为JAVA武林中的一个顶尖高手帮派,它又是如何通过jdbc实现分布式锁的呢?接下来我们一起拜读一下。

在spring cloud task中的应用

spring cloud task中使用jdbc-lock-registry代码片段:

	@BeforeTask
	public void lockTask(TaskExecution taskExecution) {
		if (this.lockRegistry == null) {
			this.lockRegistry = getDefaultLockRegistry(taskExecution.getExecutionId());
		}
		this.lockRegistryLeaderInitiator = new LockRegistryLeaderInitiator(
				this.lockRegistry,
				new DefaultCandidate(String.valueOf(taskExecution.getExecutionId()),
						this.taskNameResolver.getTaskName()));
		this.lockRegistryLeaderInitiator
				.setApplicationEventPublisher(this.applicationEventPublisher);
		this.lockRegistryLeaderInitiator.setPublishFailedEvents(true);
		//分布式锁启动
		this.lockRegistryLeaderInitiator.start();
		while (!this.lockReady) {
			try {
				Thread.sleep(this.taskProperties.getSingleInstanceLockCheckInterval());
			}
			catch (InterruptedException ex) {
				logger.warn("Thread Sleep Failed", ex);
			}
			if (this.lockFailed) {
				String errorMessage = String.format(
						"Task with name \\"%s\\" is already running.",
						this.taskNameResolver.getTaskName());
				try {
					this.lockRegistryLeaderInitiator.destroy();
				}
				catch (Exception exception) {
					throw new TaskExecutionException("Failed to destroy lock.",
							exception);
				}
				throw new TaskExecutionException(errorMessage);
			}
		}
	}

	@AfterTask
	public void unlockTaskOnEnd(TaskExecution taskExecution) throws Exception {
	    //分布式锁销毁
		this.lockRegistryLeaderInitiator.destroy();
	}
	
	private LockRegistry getDefaultLockRegistry(long executionId) {
    	DefaultLockRepository lockRepository = new DefaultLockRepository(this.dataSource,
    			String.valueOf(executionId));
    	lockRepository.setPrefix(this.taskProperties.getTablePrefix());
    	lockRepository.setTimeToLive(this.taskProperties.getSingleInstanceLockTtl());
    	lockRepository.afterPropertiesSet();
    	return new JdbcLockRegistry(lockRepository);
	}

通过上面的代码以及结合着之前的文章[源码解析]之-spring cloud task启动执行流程详解我们可以知道,在spring cloud task执行过程前会先执行 lockRegistryLeaderInitiator.start() 方法获取分布式锁,在task执行完毕后会执行 lockRegistryLeaderInitiator.destroy() 方法进行对锁的释放。

所以对于jdbc实现分布式锁的实现细节我们也主要看上面这个LockRegistryLeaderInitiator类的start方法和destroy方法就可以了。

在使用lockRegistryLeaderInitiator前需要对lockRegistryLeaderInitiator进行初始化,在spring cloud task中就定义了一个JdbcLockRegistry

lockRegistryLeaderInitiator.start

	/**
	 * Start the registration of the {@link #candidate} for leader election.
	 */
	@Override
	public void start() {
		if (this.leaderEventPublisher == null && this.applicationEventPublisher != null) {
			this.leaderEventPublisher = new DefaultLeaderEventPublisher(this.applicationEventPublisher);
		}
		//加对象锁执行
		synchronized (this.lifecycleMonitor) {
		    //如果running处于false则执行下面的代码,running初始时为false
			if (!this.running) {
			    //buildLeaderPath应返回资源名称,spring cloud task为taskNameResolver.getTaskName()的值
				this.leaderSelector = new LeaderSelector(buildLeaderPath());
				//将running设为true
				this.running = true;
				//向线程池提交leaderSelector
				this.future = this.executorService.submit(this.leaderSelector);
				logger.debug("Started LeaderInitiator");
			}
		}
	}
	
	/**
	 * @return the lock key used by leader election
	 */
	private String buildLeaderPath() {
		return this.candidate.getRole();
	}

从上面的代码可以看出,start方法会以当前task的名称创建一个LeaderSelector出来,然后提交给线程池

对于LeaderSelector里干了什么事情,主要需要看一下它的构造方法和call方法

LeaderSelector

LeaderSelector构造方法

		LeaderSelector(String lockKey) {
		    //定义lock
			this.lock = LockRegistryLeaderInitiator.this.locks.obtain(lockKey);
			this.lockKey = lockKey;
		}

LeaderSelector是这一个内部类。上面的代码最终会执行LockRegistryLeaderInitiator类中的locks对应方法。也就是JdbcLockRegistry的obtain方法

具体代码为:

	@Override
	public Lock obtain(Object lockKey) {
		Assert.isInstanceOf(String.class, lockKey);
		//根据lockKey产生一个uuid出来,相当于时来个md5
		String path = pathFor((String) lockKey);
		//locks为:Map<String, JdbcLock> locks = new ConcurrentHashMap<>();
		//如是不存在path,则产生一个JdbcLock放到locks这个map中
		//三个参数值分别为:DefaultLockRepository、100、key(task的名称)
		return this.locks.computeIfAbsent(path, (key) -> new JdbcLock(this.client, this.idleBetweenTries, key));
	}
	
	private String pathFor(String input) {
		return input == null ? null : UUIDConverter.getUUID(input).toString();
	}

LeaderSelector的call方法

		@Override
		public Void call() {
			try {
			    //获取running状态,如果running为true,就一直执行下面的代码
				while (isRunning()) {
					try {
					    //尝试获取锁,具体看下面的代码
						tryAcquireLock();
					}
					catch (Exception e) {
						if (handleLockException(e)) {
							return null;
						}
					}
				}
			}
			finally {
			    //running为false,或上面的代码异常了,执行finally代码块
			    //locked为true代表还占用资源的锁
				if (this.locked) {
				    //锁标志修改为false
					this.locked = false;
					try {
					    //执行JdbcLock的unlock方法,释放锁
						this.lock.unlock();
					}
					catch (Exception e) {
						logger.debug("Could not unlock during stop for " + this.context
								+ " - treat as broken. Revoking...", e);
					}
					// We are stopping, therefore not leading any more
					handleRevoked();
				}
			}
			return null;
		}
		
		private void handleRevoked() {
			LockRegistryLeaderInitiator.this.candidate.onRevoked(this.context);
			if (LockRegistryLeaderInitiator.this.leaderEventPublisher != null) {
				try {
					LockRegistryLeaderInitiator.this.leaderEventPublisher.publishOnRevoked(
							LockRegistryLeaderInitiator.this, this.context,
							LockRegistryLeaderInitiator.this.candidate.getRole());
				}
				catch (Exception e) {
					logger.warn("Error publishing OnRevoked event.", e);
				}
			}
		}

通过call方法可以看出,如果running处于true,则会一直执行tryAcquireLock方法获取锁,当运行完毕后会执行lock(JdbcLock)的unlock方法进行锁释放

LeaderSelector的tryAcquireLock方法

private void tryAcquireLock() throws InterruptedException {
			if (logger.isDebugEnabled()) {
				logger.debug("Acquiring the lock for " + this.context);
			}
			//heartBeatMillis默认为500
			//在锁的过期时间内一直尝试获取锁
			// We always try to acquire the lock, in case it expired
			boolean acquired = this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis,
					TimeUnit.MILLISECONDS);
			//之前没有获得到锁
			if (!this.locked) {
			    //现在把锁获得到了
				if (acquired) {
					// Success: we are now leader
					this.locked = true;
					//进行通知
					handleGranted();
				}
				else if (isPublishFailedEvents()) {
					publishFailedToAcquire();
				}
			}
			else if (acquired) {
				// If we were able to acquire it but we were already locked we
				// should release it
				this.lock.unlock();
				if (isRunning()) {
					// Give it a chance to expire.
					Thread.sleep(LockRegistryLeaderInitiator.this.heartBeatMillis);
				}
			}
			else {
				this.locked = false;
				// We were not able to acquire it, therefore not leading any more
				handleRevoked();
				if (isRunning()) {
					// Try again quickly in case the lock holder dropped it
					Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
				}
			}
		}

JdbcLockRegistry

JdbcLockRegistry的tryLock方法

		@Override
		public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
			long now = System.currentTimeMillis();
			//private final ReentrantLock delegate = new ReentrantLock();
			//delegate获取锁失败才会直接返回false
			if (!this.delegate.tryLock(time, unit)) {
				return false;
			}
			//当前时间加上锁的TTL值则为过期时间
			long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
			boolean acquired;
			while (true) {
				try {
				    //在超时时间内,获取锁失败,则进行休眠稍后再次尝试
					while (!(acquired = doLock()) && System.currentTimeMillis() < expire) { //NOSONAR
						Thread.sleep(this.idleBetweenTries.toMillis());
					}
					if (!acquired) {
					    //一直获取锁失败,本地锁解锁
						this.delegate.unlock();
					}
					return acquired;
				}
				catch (TransientDataAccessException | TransactionTimedOutException e) {
					// try again
				}
				catch (Exception e) {
					this.delegate.unlock();
					rethrowAsLockException(e);
				}
			}
		}
		
		private boolean doLock() {
		    //执行获取锁的方法
			boolean acquired = this.mutex.acquire(this.path);
			if (acquired) {
			//获取锁成功后,记录上次获取锁的时间
				this.lastUsed = System.currentTimeMillis();
			}
			return acquired;
		}

DefaultLockRepository.acquire

	@Transactional(isolation = Isolation.SERIALIZABLE, timeout = 1)
	@Override
	public boolean acquire(String lock) {
	    //删除过期的锁
		deleteExpired(lock);
		//更新已获取的锁
		//private String updateQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";
		if (this.template.update(this.updateQuery, new Date(), this.region, lock, this.id) > 0) {
			return true;
		}
		//初始化获取锁
		//private String insertQuery = "INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)";
		try {
			return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
		}
		catch (DuplicateKeyException e) {
			return false;
		}
	}
	
	private void deleteExpired(String lock) {
	//	private String deleteExpiredQuery = "DELETE FROM %sLOCK WHERE REGION=? AND LOCK_KEY=? AND CREATED_DATE<?";
		this.template.update(this.deleteExpiredQuery, this.region, lock,
				new Date(System.currentTimeMillis() - this.ttl));
	}

在传统项目中快速使用

参考此文:
使用spring-integration快速实现mysql分布锁


想是故乡花欲浓,无眠又在明月中

以上是关于spring自带分布式锁-[jdbc-lock-registry]源码解析的主要内容,如果未能解决你的问题,请参考以下文章

使用spring-integration快速实现mysql分布锁

使用spring-integration快速实现mysql分布锁

使用spring-integration快速实现mysql分布锁

Spring事务使用注意事项

Spring Schedule+Redisson分布式锁构建分布式任务调度

Spring整合Redis分布式锁