Spring Boot基于zookeeper原生方式实现分布式锁

Posted 嘉禾嘉宁papa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot基于zookeeper原生方式实现分布式锁相关的知识,希望对你有一定的参考价值。

一、背景

  我在之前的文章SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理详细介绍了它的使用及其原理,现在我们也根据这个思路,用zookeeper原生的方式来实现一个分布式锁,加深对分布式锁的理解。本文中Spring Boot的版本是2.5.2zookeeper的版本是3.6.3

  我们大致的大致的流程图如下图,可作为我们查看代码的一个思路,不然看的头大。(当然本图是没有包含可重入锁的流程判断在里面的

二、maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.alian</groupId>
    <artifactId>zklock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zklock</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
		
		<!--主要用于Maps.newConcurrentMap()-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、配置

3.1、application.yml配置

application.yml

server:
  port: 8082
  servlet:
    context-path: /zklock

app:
  zookeeper:
    server: 10.130.3.16:2181
    session-timeout: 15000
    #这里配置的路径没有用"/"结尾
    root-lock-path: /root/alian

3.2、属性配置类

  此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式

AppProperties.java

package com.alian.zklock.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "app.zookeeper")
public class AppProperties {

    /**
     * zookeeper服务地址
     */
    private String server;

    /**
     * session超时时间
     */
    private int sessionTimeout;

    /**
     * 分布式锁路径
     */
    private String rootLockPath;

}

3.3、ZookeeperConfig配置件

ZookeeperConfig.java

package com.alian.zklock.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.CountDownLatch;

@Slf4j
@Configuration
public class ZookeeperConfig {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    @Autowired
    private AppProperties appProperties;

    @Bean
    public ZooKeeper zookeeper() throws Exception {
        ZooKeeper zookeeper = new ZooKeeper(appProperties.getServer(), appProperties.getSessionTimeout(), event -> {
            log.info("Receive watched event: {}", event.getState());
            //获取事件的状态
            KeeperState keeperState = event.getState();
            //获取时间类型
            EventType eventType = event.getType();
            //如果是建立连接
            if (KeeperState.SyncConnected == keeperState) {
                if (EventType.None == eventType) {
                    //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
                    countDownLatch.countDown();
                    log.info("zookeeper建立连接");
                }
            }
        });
        //进行阻塞,当执行countDownLatch.countDown();后续代码才会进行
        countDownLatch.await();
        return zookeeper;
    }

}

  这里主要是对ZooKeeper 进行连接配置,关于CountDownLatch的使用,本文最后有相关的介绍。

四、实战

  定义了两个方法:加锁和释放锁。

4.1、接口

ILockService.java

package com.alian.zklock.service;

import java.util.concurrent.TimeUnit;

public interface ILockService {

    /**
     * 加锁
     *
     * @param lockPath
     * @param time
     * @param unit
     * @return
     */
    boolean lock(String lockPath, long time, TimeUnit unit);

    /**
     * 释放锁
     *
     * @return
     */
    void release();

}

4.2、接口核心实现

  这个实现类的注释,我想已经很详细了。可以细细阅读,可以加深你对zookeeper分布式锁实现原理的理解。

ZookeeperLockService.java

package com.alian.zklock.service.impl;

import com.alian.zklock.service.ILockService;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
public class ZookeeperLockService implements ILockService {

	//依赖需要导入:<groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version>
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    @Autowired
    private ZooKeeper zooKeeper;

    //好的思想直接拿来用
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);
		//构造方法
        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    /**
     * 加锁
     *
     * @param lockPath
     * @return
     * @throws Exception
     */
    public boolean lock(String lockPath, long time, TimeUnit unit) {
        //可重入,确保同一线程,可以重复加锁
        Thread currentThread = Thread.currentThread();
        //根据线程号获取线程锁数据
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            // 说明该线程已加锁过,直接放行
            lockData.lockCount.incrementAndGet();
            return true;
        }
        String currentLockPath = attemptLock(lockPath, time, unit);
        //如果不为空则表示获取到了锁
        if (StringUtils.isNotBlank(currentLockPath)) {
            //把数据缓存起来
            LockData newLockData = new LockData(currentThread, currentLockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }

    /**
     * 尝试获取锁,获取成功返回锁路径
     *
     * @param lockPath
     * @param time
     * @param unit
     * @return
     */
    public String attemptLock(String lockPath, long time, TimeUnit unit) {
        //创建临时有序节点,传入的lockPath没有"/"
        try {
            String currentLockPath = zooKeeper.create(lockPath + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            log.info("线程:【{}】->【{}】尝试竞争锁", Thread.currentThread().getName(), currentLockPath);
            //创建临时节点失败
            if (StringUtils.isBlank(currentLockPath)) {
                throw new Exception("生成临时节点异常");
            }
            //检查当前节点是否获取到了锁
            boolean hasLock = checkLocked(lockPath, currentLockPath, time, unit);
            //获取到了锁则返回锁节点路径
            return hasLock ? currentLockPath : null;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 检查是否获取到锁
     *
     * @param lockPath
     * @param currentLockPath
     * @param time
     * @param unit
     * @return
     * @throws Exception
     */
    public boolean checkLocked(String lockPath, String currentLockPath, long time, TimeUnit unit) {
        boolean hasLock = false;
        boolean toDelete = false;
        try {
            while (!hasLock) {
                //检查是否获取到了锁,没有获取到则返回前一个节点
                Pair<Boolean, String> pair = getsTheLock(lockPath, currentLockPath);
                //当前节点是否获取到了锁
                boolean currentLock = pair.getLeft();
                //获取前一个节点
                String preSequencePath = pair.getRight();
                if (currentLock) {
                    //获取到了锁
                    hasLock = true;
                } else {
                    //等待
                    final CountDownLatch latch = new CountDownLatch(1);
                    //订阅比自己次小顺序节点的删除事件
                    Watcher watcher = watchedEvent -> {
                        log.info("监听到的变化【】 watchedEvent = {}", watchedEvent);
                        latch.countDown();
                    };
                    Stat stat = zooKeeper.exists(preSequencePath, watcher);
                    if (stat != null) {
                        log.info("线程:【{}】等待锁【{}】释放", Thread.currentThread().getName(), preSequencePath);
                        boolean await = latch.await(time, unit);
                        if (!await) {
                            //说明超时了
                            log.info("获取锁超时");
                            toDelete = true;
                            break;
                        }
                    }
                    //检查锁
                    Pair<Boolean, String> checkPair = getsTheLock(lockPath, currentLockPath);
                    if (checkPair.getLeft()) {
                        hasLock = true;
                    }
                }
            }
        } catch (Exception e) {
            log.error("检查是否获取到锁异常", e);
            if (e instanceof InterruptedException) {
                toDelete = true;
            }
        } finally {
            if (toDelete) {
                deleteCurrentPath(currentLockPath);
            }
        }
        return hasLock;
    }

    /**
     * 检测是否已经获取到了锁,没有获取到则返回前一个节点
     *
     * @param lockPath
     * @param currentLock
     * @return
     * @throws Exception
     */
    private Pair<Boolean, String> getsTheLock(Stri

以上是关于Spring Boot基于zookeeper原生方式实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

(031)Spring Boot之服务的注册与发现,使用zookeeper演示负载均衡

(031)Spring Boot之服务的注册与发现,使用zookeeper演示负载均衡

ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用

通过dubbo暴露接口调用方法,及基于zookeeper的dubbo涉及配置文件

通过dubbo暴露接口调用方法,及基于zookeeper的dubbo涉及配置文件

通过dubbo暴露接口调用方法,及基于zookeeper的dubbo涉及配置文件