分布式锁三大技术方案实战——基于zookeeper方式实现分布式锁

Posted 北溟溟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁三大技术方案实战——基于zookeeper方式实现分布式锁相关的知识,希望对你有一定的参考价值。

前言

在上一小节我们详细讲解了使用mysql数据库实现分布式锁的技术方案及实现案例,本节我们使用zookeeper方式实现分布式锁,原理同上,依然是使用共享资源获取锁,从而实现分布式锁。zookeeper具备高可用、可重入、阻塞锁特性、可解决失效死锁等问题。本节案例我们使用zookeeper的Curator工具包实现,Curator 封装了 Zookeeper 底层的API,方便我们操作Zookeeper ,并且它封装了分布式锁的功能,我们直接使用即可。Curator 实现了可重入锁(InterProcessMutex),也实现了不可重入锁(InterProcessSemaphoreMutex)。在可重入锁中还实现了读写锁。所以我们可以方便的实现zookeeper下的分布式锁。本节案例我们使用InterProcessSemaphoreMutex实现分布式锁。

正文

  • docker-compose安装高可用的zookeeper实例集群

使用vi工具创建一个zookeeper集群启动脚本zookeeper-cluster.yml

version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

启动集群

命令:docker-compose -f zookeeper-cluster.yml -p zoo up -d

idea安装zookeeper插件,管理zookeeper

连接zookeeper集群

  • pom中引入curator工具包
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>5.1.0</version>
</dependency>

  • application.yml中添加zookeeper相关配置
#
# ******************************************************************************************************************************************
# Copyright (c) 2021 .
# All rights reserved.
# 项目名称:atp-platform
# 项目描述:应用测试平台管理端
# 版权说明:本软件属云嘀科技有限公司所有,在未获得云嘀科技有限公司正式授权情况下,任何企业和个人,不能获取、阅读、安装、传播本软件涉及的任何受知识产权保护的内容。
# *******************************************************************************************************************************************
#

server:
  port: 9000
spring:
  datasource:
    dynamic:
      primary: master #设置默认的数据源或者数据源组,默认值即为master
      strict: false #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候会抛出异常,不启动则使用默认数据源.
      datasource:
        master:
          url: jdbc:mysql://192.168.23.134:3306/atp
          username: root
          password: root
          driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
        slave:
          url: jdbc:mysql://192.168.23.134:3307/atp
          username: root
          password: root
          driver-class-name: com.mysql.cj.jdbc.Driver
  profiles:
    active: dev
  redis:
    #默认数据分区
    database: 0
    #redis集群节点配置
    cluster:
      nodes:
        - 192.168.23.134:6379
        - 192.168.23.134:6380
        - 192.168.23.134.6381
      max-redirects: 3
    #超时时间
    timeout: 10000
    #哨兵节点配置
    sentinel:
      master: mymaster
      nodes:
        - 192.168.23.134:26379
        - 192.168.23.134:26380
        - 192.168.23.134:26381
    #redis密码
    password: root
    #redis 客户端工具
    lettuce:
      pool:
        # 连接池最大连接数(使用负值表示没有限制) 默认为8
        max-active: 8
        # 连接池中的最小空闲连接 默认为 0
        min-idle: 1
        # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1
        max-wait: 1000
        # 连接池中的最大空闲连接 默认为8
        max-idle: 8
mybatis-plus:
  mapper-locations: classpath*:/mapper/*/*Mapper.xml
  type-aliases-package: com.yundi.atp.platform.module.*.entity
  configuration:
    map-underscore-to-camel-case: true
  global-config:
    db-config:
      id-type: assign_id
zookeeper:
  address: 192.168.23.134:2181,192.168.23.134:2182,192.168.23.134:2183
  timeout: 300000
  maxRetries : 3
logging:
  level:
    com.yundi.atp.platform.module: debug

  • 创建一个Curator实例,操作zookeeper
package com.yundi.atp.platform.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: yanp
 * @Description: zookeeper配置
 * @Date: 2021/5/7 15:50
 * @Version: 1.0.0
 */
@Slf4j
@Configuration
public class ZookeeperConfig {
    /**
     * 地址
     */
    @Value(value = "${zookeeper.address}")
    private String zookeeperAddress;

    /**
     * 超时时间
     */
    @Value(value = "${zookeeper.timeout}")
    private Integer timeout;

    /**
     * 重连次数
     */
    @Value(value = "${zookeeper.maxRetries}")
    private Integer maxRetries;

    @Bean
    public CuratorFramework curatorFramework() {
        // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(timeout, maxRetries);
        // 创建客户端
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zookeeperAddress, retryPolicy);
        // 添加watched 监听器
        curatorFramework.getCuratorListenable().addListener((CuratorFramework client, CuratorEvent event) -> {
            CuratorEventType type = event.getType();
            if (type == CuratorEventType.WATCHED) {
                WatchedEvent watchedEvent = event.getWatchedEvent();
                String path = watchedEvent.getPath();
                log.info(watchedEvent.getType() + " ----------------------------> " + path);
                // 重新设置该节点监听
                if (null != path) {
                    client.checkExists().watched().forPath(path);
                }
            }
        });
        // 启动客户端
        curatorFramework.start();
        return curatorFramework;
    }
}

  • 控制层创建一个请求zookeeper测试请求案例
/*
 * ******************************************************************************************************************************************
 * Copyright (c) 2021 .
 * All rights reserved.
 * 项目名称:atp-platform
 * 项目描述:应用测试平台管理端
 * 版权说明:本软件属云嘀科技有限公司所有,在未获得云嘀科技有限公司正式授权情况下,任何企业和个人,不能获取、阅读、安装、传播本软件涉及的任何受知识产权保护的内容。
 * *******************************************************************************************************************************************
 */
package com.yundi.atp.platform.module.sys.controller;


import com.yundi.atp.platform.common.Result;
import com.yundi.atp.platform.module.sys.entity.User;
import com.yundi.atp.platform.module.sys.service.DistributeLockService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * <p>
 * 分布式锁 前端控制器
 * </p>
 *
 * @author yanp
 * @since 2021-04-29
 */
@RestController
@RequestMapping("/sys/distributeLock")
public class DistributeLockController {
    @Autowired
    private DistributeLockService distributeLockService;

    /**
     * 分布式锁获取查询数据:方式一(mysql)
     * @return
     */
    @ApiOperation(value = "通过mysql方式获取分布式锁案例")
    @GetMapping(value = "/findAllUserInfoByMysqlLock")
    public Result findAllUserInfoByMysqlLock() {
        List<User> userList = distributeLockService.findAllUserInfoByMysqlLock();
        //1.没有获取到锁,直接返回提示信息
        if (userList == null) {
            return Result.fail("正在全力为您加载中,请稍后重试!");
        }
        return Result.success(userList);
    }

    /**
     * 分布式锁获取查询数据:方式二(zookeeper)
     * @return
     */
    @ApiOperation(value = "通过zookeeper方式获取分布式锁案例")
    @GetMapping(value = "/findAllUserInfoByZookeeperLock")
    public Result findAllUserInfoByZookeeperLock() {
        List<User> userList = distributeLockService.findAllUserInfoByZookeeperLock();
        //1.没有获取到锁,直接返回提示信息
        if (userList == null) {
            return Result.fail("正在全力为您加载中,请稍后重试!");
        }
        return Result.success(userList);
    }

}

  • 分布式锁实现类
/*
 * ******************************************************************************************************************************************
 * Copyright (c) 2021 .
 * All rights reserved.
 * 项目名称:atp-platform
 * 项目描述:应用测试平台管理端
 * 版权说明:本软件属云嘀科技有限公司所有,在未获得云嘀科技有限公司正式授权情况下,任何企业和个人,不能获取、阅读、安装、传播本软件涉及的任何受知识产权保护的内容。
 * *******************************************************************************************************************************************
 */
package com.yundi.atp.platform.module.sys.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yundi.atp.platform.common.Constant;
import com.yundi.atp.platform.module.sys.entity.DistributeLock;
import com.yundi.atp.platform.module.sys.entity.User;
import com.yundi.atp.platform.module.sys.mapper.DistributeLockMapper;
import com.yundi.atp.platform.module.sys.service.DistributeLockService;
import com.yundi.atp.platform.module.sys.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * 分布式锁 服务实现类
 * </p>
 *
 * @author yanp
 * @since 2021-04-29
 */
@Slf4j
@Service
public class DistributeLockServiceImpl extends ServiceImpl<DistributeLockMapper, DistributeLock> implements DistributeLockService {
    @Autowired
    private UserService userService;
    @Autowired
    private CuratorFramework curatorFramework;

    @Override
    public List<User> findAllUserInfoByMysqlLock() {
        //1.删除过期的锁
        this.remove(new QueryWrapper<DistributeLock>().eq("method_name", "findAllUserInfoByMysqlLock").le("expire_time", LocalDateTime.now()));
        //2.申请锁
        DistributeLock distributeLock = new DistributeLock();
        distributeLock.setMethodName("findAllUserInfoByMysqlLock");
        distributeLock.setCreateTime(LocalDateTime.now());
        distributeLock.setExpireTime(LocalDateTime.now().plusMinutes(1));
        try {
            this.save(distributeLock);
        } catch (Exception e) {
            log.error(Thread.currentThread().getName() + ":分布式锁已被占用,请稍后重试!");
            return null;
        }
        //3.执行具体的业务
        List<User> userList = userService.findAllUserInfo();
        try {
            //模拟业务需要执行5秒钟
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //4.释放锁
        this.removeById(distributeLock.getId());
        //5.返回结果
        return userList;
    }

    @Override
    public List<User> findAllUserInfoByZookeeperLock() {
        String lockName = Constant.LOCK_ROOT_PATH+"findAllUserInfoByZookeeperLock";
        log.info("==================================1.线程:{}({})开始获取锁!!!==========================", lockName, Thread.currentThread().getName());
        //1.创建zookeeper分布式锁
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curatorFramework, lockName);
        try {
            //2.获取锁资源
            boolean flag = lock.acquire(30, TimeUnit.SECONDS);
            if (flag) {
                log.info("==================================2.线程:{}({})获取到了锁,开始执行业务!!!==========================", lockName, Thread.currentThread().getName());
                //3.处理具体的业务
                List<User> userList = userService.findAllUserInfo();
                return userList;
            }
        } catch (Exception e) {
            log.info("======================================3.获取锁异常:{}({}):{}========================", lockName, Thread.currentThread().getName(), e.getMessage());
            return null;
        } finally {
            try {
                lock.release();
                log.info("===================================4.释放锁:{}({})=========================", lockName, Thread.currentThread().getName());
            } catch (Exception e) {
                log.info("===================================5.释放锁出错:{}({}):{}========================================", lockName, Thread.currentThread().getName(), e.getMessage());
            }
        }
        return null;
    }
}

  • 使用jemeter并发测试验证

jmeter的并发测试操作过程可参考(一)HTTP请求压力测试篇——性能实时监控测试平台搭建(Grafana+Influxdb+Jmeter)相关章节内容。

从打印的日志我们不难发现,只有锁释放了,其它线程才能获取到锁,执行业务,也证实了我们zookeeper分布式锁生效了。

结语

ok,到这里我们zookeeper分布式锁就讲解完成了,希望能对你有所帮助,我们下期不见不散。

以上是关于分布式锁三大技术方案实战——基于zookeeper方式实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot电商项目实战 — Zookeeper的分布式锁实现

不用找了,基于 Redis 的分布式锁实战来了!

Java分布式锁三种实现方案——方案三:基于Zookeeper的分布式锁,利用节点名称的唯一性来实现独占锁

基于Zookeeper的分布式锁

基于Zookeeper的分布式锁

漫谈分布式锁之ZooKeeper实现