springboot2整合zookeeper集成curator
Posted 老人与JAVA
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot2整合zookeeper集成curator相关的知识,希望对你有一定的参考价值。
步骤:
1- pom.xml
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
2- yml配置:
zk:
url: 127.0.0.1:2181
localPath: /newlock
timeout: 3000
3- 配置类
package com.test.domi.config; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ZookeeperConf { @Value("${zk.url}") private String zkUrl; @Bean public CuratorFramework getCuratorFramework(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl,retryPolicy); client.start(); return client; } }
4- 使用
package com.test.domi.common.utils.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Component("zklock") public class ZKlock implements Lock { @Autowired private CuratorFramework zkClient; @Value("${zk.localPath}") private String lockPath; private String currentPath; private String beforePath; @Override public boolean tryLock() { try { //根节点的初始化放在构造函数里面不生效 if (zkClient.checkExists().forPath(lockPath) == null) { System.out.println("初始化根节点==========>" + lockPath); zkClient.create().creatingParentsIfNeeded().forPath(lockPath); } System.out.println("当前线程" + Thread.currentThread().getName() + "初始化根节点" + lockPath); } catch (Exception e) { } if (currentPath == null) { try { currentPath = this.zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(lockPath + "/"); } catch (Exception e) { return false; } } try { //此处该如何获取所有的临时节点呢?如locks00004.而不是获取/locks/order中的order作为子节点?? List<String> childrens = this.zkClient.getChildren().forPath(lockPath); Collections.sort(childrens); if (currentPath.equals(lockPath + "/" + childrens.get(0))) { System.out.println("当前线程获得锁" + currentPath); return true; }else{ //取前一个节点 int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1)); //如果是-1表示children里面没有该节点 beforePath = lockPath + "/" + childrens.get(curIndex - 1); } } catch (Exception e) { return false; } return false; } @Override public void lock() { if (!tryLock()) { waiForLock(); lock(); } } @Override public void unlock() { try { zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(currentPath); } catch (Exception e) { //guaranteed()保障机制,若未删除成功,只要会话有效会在后台一直尝试删除 } } private void waiForLock(){ CountDownLatch cdl = new CountDownLatch(1); //创建监听器watch NodeCache nodeCache = new NodeCache(zkClient,beforePath); try { nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { cdl.countDown(); System.out.println(beforePath + "节点监听事件触发,重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData())); } }); } catch (Exception e) { } //如果前一个节点还存在,则阻塞自己 try { if (zkClient.checkExists().forPath(beforePath) == null) { cdl.await(); } } catch (Exception e) { }finally { //阻塞结束,说明自己是最小的节点,则取消watch,开始获取锁 try { nodeCache.close(); } catch (IOException e) { } } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
5- 调用demo
package com.test.domi.controller; import com.test.domi.common.utils.ZkUtil; import com.test.domi.common.utils.lock.ZKlock; import org.I0Itec.zkclient.ZkClient; import org.apache.curator.framework.CuratorFramework; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/zk") public class ZKController { @Autowired private CuratorFramework zkClient; // @Autowired // private ZkClient zkClient; private String url = "127.0.0.1:2181"; private int timeout = 3000; private String lockPath = "/testl"; @Autowired private ZKlock zklock; private int k = 1; @GetMapping("/lock") public Boolean getLock() throws Exception{ for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { zklock.lock();
zklock.unlock();
} }).start(); }
return true; } }
以上是关于springboot2整合zookeeper集成curator的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot2 整合 Zookeeper组件,管理架构中服务协调
SpringBoot2.0实战(23)整合Redis之集成缓存SpringDataCache
SpringBoot2——数据访问的集成 & 单元测试(JUnit5)