springboot2整合zookeeper集成curator

Posted 老人与JAVA

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot2整合zookeeper集成curator相关的知识,希望对你有一定的参考价值。

步骤:

1- pom.xml

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

 

2- yml配置:

zk:
  url: 127.0.0.1:2181
  localPath: /newlock
  timeout: 3000

 

3- 配置类

package com.test.domi.config;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZookeeperConf {

    @Value("${zk.url}")
    private String zkUrl;

    @Bean
    public CuratorFramework getCuratorFramework(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl,retryPolicy);
        client.start();
        return client;
    }
}

 

4- 使用

package com.test.domi.common.utils.lock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Component("zklock")
public class ZKlock implements Lock {

    @Autowired
    private CuratorFramework zkClient;
    @Value("${zk.localPath}")
    private String lockPath;
    private String currentPath;
    private String beforePath;

    @Override
    public boolean tryLock() {
        try {
            //根节点的初始化放在构造函数里面不生效
            if (zkClient.checkExists().forPath(lockPath) == null) {
                System.out.println("初始化根节点==========>" + lockPath);
                zkClient.create().creatingParentsIfNeeded().forPath(lockPath);
            }
            System.out.println("当前线程" + Thread.currentThread().getName() + "初始化根节点" + lockPath);
        } catch (Exception e) {
        }

        if (currentPath == null) {
            try {
                currentPath = this.zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .forPath(lockPath + "/");
            } catch (Exception e) {
                return false;
            }
        }
        try {
            //此处该如何获取所有的临时节点呢?如locks00004.而不是获取/locks/order中的order作为子节点??
            List<String> childrens = this.zkClient.getChildren().forPath(lockPath);
            Collections.sort(childrens);
            if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
                System.out.println("当前线程获得锁" + currentPath);
                return true;
            }else{
               //取前一个节点
                int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
                //如果是-1表示children里面没有该节点
                beforePath = lockPath + "/" + childrens.get(curIndex - 1);
            }
        } catch (Exception e) {
            return false;
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            waiForLock();
            lock();
        }
    }

    @Override
    public void unlock() {
        try {
            zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(currentPath);
        } catch (Exception e) {
            //guaranteed()保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
        }
    }

    private void waiForLock(){
        CountDownLatch cdl = new CountDownLatch(1);
        //创建监听器watch
          NodeCache nodeCache = new NodeCache(zkClient,beforePath);
        try {
            nodeCache.start(true);
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    cdl.countDown();
                    System.out.println(beforePath + "节点监听事件触发,重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));
                }
            });
        } catch (Exception e) {
        }
        //如果前一个节点还存在,则阻塞自己
        try {
            if (zkClient.checkExists().forPath(beforePath) == null) {
                cdl.await();
            }
        } catch (Exception e) {
        }finally {
            //阻塞结束,说明自己是最小的节点,则取消watch,开始获取锁
            try {
                nodeCache.close();
            } catch (IOException e) {
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }

}

 

 

5- 调用demo

package com.test.domi.controller;

import com.test.domi.common.utils.ZkUtil;
import com.test.domi.common.utils.lock.ZKlock;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/zk")
public class ZKController {

    @Autowired
    private CuratorFramework zkClient;
//    @Autowired
//    private ZkClient zkClient;

    private String url = "127.0.0.1:2181";
    private int timeout = 3000;
    private String lockPath = "/testl";
    @Autowired
    private ZKlock zklock;
    private int k = 1;

    @GetMapping("/lock")
    public Boolean getLock() throws Exception{

        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                  zklock.lock();

          zklock.unlock();
} }).start(); }
return true; } }

 







以上是关于springboot2整合zookeeper集成curator的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot2 整合 Zookeeper组件,管理架构中服务协调

SpringBoot2.0实战(23)整合Redis之集成缓存SpringDataCache

SpringBoot2.0集成FastDFS

SpringBoot2——数据访问的集成 & 单元测试(JUnit5)

Springboot2.x+shiro+redis整合填坑 redis只做缓存的情况

SpringBoot-整合Dubbo+Zookeeper