SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

Posted 嘉禾嘉宁papa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理相关的知识,希望对你有一定的参考价值。

一、简介

  我们知道在JDK 的 java.util.concurrent.locks包中提供了可重入锁,读写锁,及超时获取锁的方法等,但在分布式系统中,当多个应用需要共同操作某一个资源时,就没办法使用JDK里的锁实现了,所以今天的主角就是ZooKeeper + Curator 来完成分布式锁,Curator 提供的四种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

本文主要介绍可重入排它锁 InterProcessMutex 的相关使用及源码解读。

二、maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.alian</groupId>
    <artifactId>zookeeper-curator</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zookeeper-curator</name>
    <description>SpringBoot基于Zookeeper和Curator实现分布式锁</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、配置类

3.1、属性配置文件

# zookeeper服务器地址(ip+port)
zookeeper.server=10.130.3.16:2181
# 休眠时间
zookeeper.sleep-time=1000
# 最大重试次数
zookeeper.max-retries=3
# 会话超时时间
zookeeper.session-timeout=15000
# 连接超时时间
zookeeper.connection-timeout=5000

本机环境有限就不搭建集群了,具体还是在于curator分布式锁的使用及原理。

3.2、属性配置类

  此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式

ZookeeperProperties.java

package com.alian.zookeepercurator.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
//读取指定路径配置文件,暂不支持*.yaml文件
@PropertySource(value = "classpath:config/zookeeper.properties", encoding = "UTF-8", ignoreResourceNotFound = true)
public class ZookeeperProperties {

    /**
     * zookeeper服务地址
     */
    private String server;

    /**
     * 重试等待时间
     */
    private int sleepTime;

    /**
     * 最大重试次数
     */
    private int maxRetries;

    /**
     * session超时时间
     */
    private int sessionTimeout;

    /**
     * 连接超时时间
     */
    private int connectionTimeout;

}

3.3、ZookeeperConfig配置类(重要

ZookeeperConfig.java

  此配置类主要是使用CuratorFramework来连接zookeeper。

package com.alian.zookeepercurator.config;

import com.alian.zookeepercurator.common.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class ZookeeperConfig {

    @Autowired
    private ZookeeperProperties zookeeperProperties;

    @Bean
    public CuratorFramework curatorFrameworkClient() {
        //重试策略,ExponentialBackoffRetry(1000,3)这里表示等待1s重试,最大重试次数为3次
        RetryPolicy policy = new ExponentialBackoffRetry(zookeeperProperties.getSleepTime(), zookeeperProperties.getMaxRetries());
        //构建CuratorFramework实例
        CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory
                .builder()
                .connectString(zookeeperProperties.getServer())
                .sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
                .connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
                .retryPolicy(policy)
                .build();
        //启动实例
        curatorFrameworkClient.start();
        return curatorFrameworkClient;
    }

    //采用这种方式注册bean可以比较优雅的关闭连接
    @Bean(destroyMethod = "destroy")
    public ZookeeperClient zookeeperClient(CuratorFramework curatorFrameworkClient) {
        return new ZookeeperClient(curatorFrameworkClient);
    }

}

3.4、ZookeeperClient配置类(重要

ZookeeperClient.java

  这个bean是在上面的配置类里定义的,还定义了销毁的方法,这样的好处是,当服务断开后,可以关闭连接,如果直接关闭服务可能会抛出一个异常。使用和其他的使用是一样的,当然如果你为了方便,使用@Component也没有问题。

package com.alian.zookeepercurator.common;

import com.alian.zookeepercurator.lock.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

@Slf4j
public class ZookeeperClient {

    private CuratorFramework curatorFramework;

    public ZookeeperClient(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    public <T> T lock(AbstractLock<T> abstractLock) {
        //获取锁路径
        String lockPath = abstractLock.getLockPath();
        //创建InterProcessMutex实例
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); //创建锁对象
        boolean success = false;
        try {
            try {
                //加锁
                success = lock.acquire(abstractLock.getTime(), abstractLock.getTimeUnit()); //获取锁
            } catch (Exception e) {
                throw new RuntimeException("尝试获取锁异常:" + e.getMessage() + ", lockPath " + lockPath);
            }
            //判断是否加锁成功
            if (success) {
                return abstractLock.execute();
            } else {
            	log.info("获取锁失败,返回null");
                return null;
            }
        } finally {
            try {
                if (success) {
                    //释放锁
                    lock.release();
                }
            } catch (Exception e) {
                log.error("释放锁异常: {}, lockPath {}", e.getMessage(), lockPath);
            }
        }
    }

    //bean的销毁方法
    public void destroy() {
        try {
            log.info("ZookeeperClient销毁方法,如果zookeeper连接不为空,则关闭连接");
            if (getCuratorFramework() != null) {
                //这种方式比较优雅的关闭连接
                getCuratorFramework().close();
            }
        } catch (Exception e) {
            log.error("stop zookeeper client error {}", e.getMessage());
        }
    }

    public CuratorFramework getCuratorFramework() {
        return curatorFramework;
    }
}

四、业务编写

4.1、抽象类AbstractLock

AbstractLock.java
  定义一个抽象锁的类,包含锁路径,过期时间及时间单位,子类只需要实现execute方法即可。

package com.alian.zookeepercurator.common;

import java.util.concurrent.TimeUnit;

public abstract class AbstractLock<T> {

    /**
     * 锁路径
     */
    protected String lockPath;

    /**
     * 超时时间
     */
    protected long time;

    protected TimeUnit timeUnit;

    public AbstractLock(String lockPath, long time, TimeUnit timeUnit) {
        this.lockPath = lockPath;
        this.time = time;
        this.timeUnit = timeUnit;
    }

    public void setLockPath(String lockPath) {
        this.lockPath = lockPath;
    }

    public String getLockPath() {
        return lockPath;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public void setTimeUnit(TimeUnit timeUnit) {
        this.timeUnit = timeUnit;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    /**
     * 执行业务的方法
     *
     * @return
     */
    public abstract T execute();
}

4.2、锁使实现(核心

CuratorLockService.java

package com.alian.zookeepercurator.service;

import com.alian.zookeepercurator.common.ZookeeperClient;
import com.alian.zookeepercurator.common.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class CuratorLockService {

    @Autowired
    private ZookeeperClient zookeeperClient;

    @Autowired
    private CuratorFramework curatorFramework;

    //库存存取的路径
    private static final String dataPath = "/root/data/stock";

    //初始化库存的路径
    private static final String initPath = "/root/init/stock";

    /**
     * 此方法系统启动执行,使用zookeeper存一个库存用于测试,这里也使用了锁。(只是一个模拟初始化库存的方法)
     */
    @PostConstruct
    public void init() {
        zookeeperClient.lock(new AbstractLock<Boolean>(initPath, 20, TimeUnit.SECONDSSpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

Springboot整合Dubbo和Zookeeper

SpringBoot电商项目实战 — Zookeeper的分布式锁实现

SpringBoot2 整合 Zookeeper组件,管理架构中服务协调

SpringBoot电商项目实战 — Redis实现分布式锁

SpringBoot 整合 Zookeeper 接入Starring微服务平台