ZooKeeper 分布式锁设计实战

Posted 栗子~~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper 分布式锁设计实战相关的知识,希望对你有一定的参考价值。

文章目录

前言

  如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
  而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


ZooKeeper 分布式锁设计

01 ZooKeeper 的基础概念

一篇文章带您掌握ZooKeeper的基础概念

02 ZooKeeper 分布式锁设计原理

通过控制创建与共享资源相关的"临时顺序节点"和动态Watcher监听机制,从而控制多线程对共享资源的并发访问。

03 ZooKeeper 分布式锁整体设计流程

04 ZooKeeper节点的数据结构与watcher监听器机制

05 实战:

05::01 依赖

		<!-- zookeeper start-->
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.12</version>
            <exclusions>
				<exclusion>
					<artifactId>slf4j-log4j12</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>4.0.0</version>
			<!-- 4.0.0原生不兼容zk 3.4, 必须进行兼容性处理 -->
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!-- zookeeper end-->

05::02 ZooKeeper 配置

zk:
  host: 127.0.0.1:2181  #ZooKeeper连接
  namespace: yzy_distributeLock #命名空间只是用来区别不同的项目

/**
 * zk 配置
 * @yangzhenyu
 *
 * */
@Configuration
@ConfigurationProperties(prefix = "zk", ignoreUnknownFields = true)
public class ZookeeperConfig 
    //zk 连接
    @Value("127.0.0.1:2181")
    private String host;
    //命名空间只是用来区别不同的项目
    @Value("yzy_distributeLock")
    private String namespace;
    private static Logger log = LoggerFactory.getLogger(ZookeeperConfig.class);
    public ZookeeperConfig() 
        log.info("=================== Zookeeper 配置===================");
    
    /**
     * 创建CuratorFramework实例
     * 01 采用工厂模式进行创建
     * 02 指定客户端连接zk的策略:重试机制(5次、每次间隔1秒)
     * */
    @Bean
    public CuratorFramework curatorFramework()
      CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                                                                 .connectString(host)
                                                                 .namespace(namespace)
                                                                 .retryPolicy(new RetryNTimes(5,1000))
                                                                 .build();
      curatorFramework.start();
      return curatorFramework;
    

05::03 自定义异常类

/**
 * 自定义异常
 * @author yangzhenyu
 * */
public class MyException extends RuntimeException 
    private String code;

    public String getCode() 
        return code;
    

    public MyException(Exception e) 
        super(e);
    

    public MyException(ExceptionEnumService enums) 
        super(enums.getDesc());
        this.code = enums.getCode();
    

    public MyException(ExceptionEnumService enums, String msg, String... strings) 
        super(String.format(StringUtils.isEmpty(msg) ? enums.getDesc() : msg, strings));
        this.code = enums.getCode();
    

    public MyException(ExceptionEnumService enums, Throwable e, String... strings) 
        super(String.format(enums.getDesc(), strings), e);
        this.code = enums.getCode();
    

    public MyException(ExceptionEnumService enums, String msg, Throwable e, String... strings) 
        super(String.format(StringUtils.isEmpty(msg) ? enums.getDesc() : msg, strings), e);
        this.code = enums.getCode();
    



/**
 * @author yangzhenyu
 * */
public enum ExceptionEnum implements ExceptionEnumService
    DISTRIBUTED_SEESION_ERROR("DISTRIBUTED_SEESION_ERROR","分布式会话出现错误 失败");
    /**
     * 枚举编码
     */
    private String code;
    /**
     * 枚举说明
     */
    private String desc;


    ExceptionEnum(String code, String desc) 
        this.code = code;
        this.desc = desc;
    

    @Override
    public String getCode() 
        return  this.code;
    

    @Override
    public String getDesc() 
        return this.desc;
    

    public static  String getDesc(String code)
        if(StringUtils.isEmpty(code)) 
            return null;
        
        List<String> list  = new ArrayList<>();
        Arrays.stream(ExceptionEnum.values()).filter(t->t.code.equals(code)).forEach(v->
            list.add(v.desc);
        );

        return list.size()>0?list.get(0):"";
    


05::04 返回vo

/**
 * @author yangzhenyu
 * */
public class ResponseBo extends HashMap<String, Object> 

    private static final long serialVersionUID = -8713837118340960775L;

    // 成功
    private static final Integer SUCCESS = 200;
    // 警告
    private static final Integer WARN = 1;
    // 异常 失败
    private static final Integer FAIL = 500;



    public ResponseBo() 
        put("code", SUCCESS);
        put("msg", "操作成功");
    

    public static ResponseBo error(Object msg) 
        ResponseBo ResponseBo = new ResponseBo();
        ResponseBo.put("code", FAIL);
        ResponseBo.put("msg", msg);
        return ResponseBo;
    
    public static ResponseBo error(String code ,String msg,String data) 
        ResponseBo ResponseBo = new ResponseBo();
        ResponseBo.put("code", code == null || StringUtils.isEmpty(code) ?  ExceptionConfig.code : code);
        ResponseBo.put("msg", StringUtils.isEmpty(msg) ?  ExceptionConfig.info : msg);
        ResponseBo.put("data", data);
        return ResponseBo;
    
    public static ResponseBo error(String code ,String msg) 
        ResponseBo ResponseBo = new ResponseBo();
        ResponseBo.put("code", code == null || StringUtils.isEmpty(code) ?  ExceptionConfig.code : code);
        ResponseBo.put("msg", StringUtils.isEmpty(msg) ?  ExceptionConfig.info : msg);
        return ResponseBo;
    

    public static ResponseBo warn(Object msg) 
        ResponseBo ResponseBo = new ResponseBo();
        ResponseBo.put("code", WARN);
        ResponseBo.put("msg", msg);
        return ResponseBo;
    



    public static ResponseBo ok(Object data) 
        ResponseBo ResponseBo = new ResponseBo();
        ResponseBo.put("code", SUCCESS);
        ResponseBo.put("data", data);
        return ResponseBo;
    


    public static ResponseBo ok() 
        return new ResponseBo();
    

    public static ResponseBo error() 
        return ResponseBo.error("");
    

    @Override
    public ResponseBo put(String key, Object value) 
        super.put(key, value);
        return this;
    


05::05 分布式锁设计


/**
 * @author yangzhenyu
 * 日志模板
 * */
public abstract  class BaseLog implements BaseLogInterface
    private static Logger log = LoggerFactory.getLogger(BaseLog.class);
    private ReentrantLock lock = new ReentrantLock();
    private String clazz;

    public void setClazz(String clazz) 
        this.clazz = clazz;
    

    /**
     * 打印错误日志
     * */
    public void error(String msg)
        try
            lock.lock();
            log.error(msg);
        finally 
            lock.unlock();
        
    
    /**
     * 打印普通日志
     * */
    public void info(String msg)
        try
            lock.lock();
              log.info(msg);
        finally 
            lock.unlock();
        
    
    /**
     * 返回时间日志
     * */
    public final void endLog(String msg,long startTime)
        try
            lock.lock();
            long endTime = System.currentTimeMillis();
            log.info("<======执行源:【】==> 执行结束,耗时:秒 =======>",clazz,msg,((endTime-startTime)/1000));
        finally 
            lock.unlock();
        

    
    /**
     * 返回时间错误日志
     * */
    public final void endLogError(String msg,long startTime,Throwable e)
        try
            lock.lock();
            long endTime = System.currentTimeMillis();
            log.error(String.format("<======执行源:【%s】==> %s执行结束,耗时:%s秒 =======>",clazz,msg,((endTime-startTime)/1000)),e);
        finally 
            lock.unlock();
        

    
    /**
     * 返回开始时间
     * */
    public final long startLog(String msgKey,String param)
        try
            lock.lock();
            log.info("<======执行源:【】==>方法[]开始执行  =======>",clazz,msgKey);
            log.info("<======参数:【】  =======>",param);
            return System.currentTimeMillis();
        finally 
            lock.unlock();
        
    



/**
 * @author yangzhenyu
 * */
@Api(value = "ZooKeeper 练习", tags = "ZooKeeper 练习")
@RestController
@RequestMapping(value="api/zk")
public class ZKController extends BaseLog 
    @Override
    public long init(String name, String param) 
        this.setClazz("ZKController");
        return startLog(name,param);
    
    /**
     * ZK客户端CuratorFramework实例
     * */
    @Autowired
    private CuratorFramework curatorFramework;
    /**
     *  分布式锁设计原理
     *  通过控制创建与共享资源相关的"临时顺序节点"和动态Watcher监听机制,从而控制多线程对共享资源的并发访问。
     *
     *  而ZKnode节点将对应一个具体的路径-跟Unix文件夹路径类似  ,需要以/开头
     * */
    private static final String PATH = "/yzy/zklock/";
    /**
     * ZooKeeper分布式锁设计
     * */
    @ApiOperation(value = "ZooKeeper分布式锁设计",notes = "ZooKeeper分布式锁设计")
    @ResponseBody
    @PostMapping("/lock")
    public ResponseBo lock(String key) throws Exception 
        String msgValue = "lock";
        long startTime = init(msgValue, key);
        /**
         * 创建zk互斥锁组件实例
         * */
        InterProcessMutex mutex = new InterProcessMutex(curatorFramework,PATH+ UUID.randomUUID()+"-lock");
        try
            /**
             * 采用互斥锁组件获取分布式锁,尝试最大时间为10秒
             * */
            if (mutex.acquire(10L, TimeUnit.SECONDS)) 
                //TODO : 真正的核心处理逻辑
                this.info("获取到分布式锁");
                this.info("执行业务逻辑");
            else 
                this.error("获取zk分布式锁失败");
                throw new MyException(ExceptionEnum.LOCK_ERROR);
            
            endLog(msgValue,startTime);
            return ResponseBo.ok();
        catch (MyException m)
            endLogError(msgValue,startTime,m);
            return ResponseBo.error("失败:" + ExceptionEnum.getDesc(m.getCode()))SpringBoot电商项目实战 — Zookeeper的分布式锁实现

分布式锁三大技术方案实战——基于zookeeper方式实现分布式锁

《大厂高并发分布式锁从入门到实战》第4讲之Zookeeper分布式锁

Zookeeper实战分布式锁

SpringBoot电商项目实战 — Redis实现分布式锁

漫谈分布式锁之ZooKeeper实现