CuratorFramework实现zk同步本地配置

Posted ITdfq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CuratorFramework实现zk同步本地配置相关的知识,希望对你有一定的参考价值。

SpringBoot连接Zk


连接配置

  • pom文件导入

    <!--            zk客户端-->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>4.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
  • 连接配置

    zk:
      url: ###
      projectName: /itdfq/zookeeperDemo
      #休眠时间
      baseSleepTimeMs: 1000
      #失败重试次数
      maxRetries: 3
    
  • 连接设置

    
        /**
         * Zookeeper 框架式客户端
         *
         * @return CuratorFramework
         */
        @Bean
        public CuratorFramework getCuratorFramework() 
            //重试策略
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProperties.getBaseSleepTimeMs(), zkProperties.getMaxRetries());
            return CuratorFrameworkFactory.newClient(zkProperties.getUrl(), retryPolicy);
        
    
        /**
         * 监视 ZK
         *
         * @return TreeCache
         */
        @Bean
        public TreeCache getTreeCache() 
            return new TreeCache(curatorFramework, zkProperties.getProjectName());
        
    
  • 初始化

      @Autowired
        private CuratorFramework curatorFramework;
        @Autowired
        private TreeCache treeCache;
        /**
         * 本地配置
         */
        private final Properties properties = new Properties();
    
        @PostConstruct
        public void loadProperties() 
            try 
                log.info("================初始化配置=================");
                curatorFramework.start();
                treeCache.start();
                // 从zk中获取配置放入本地配置中
                Stat stat = curatorFramework.checkExists().forPath(zkProperties.getProjectName());
                if (stat == null) 
                    curatorFramework.create().forPath(zkProperties.getProjectName());
                
                List<String> configList = curatorFramework.getChildren().forPath(zkProperties.getProjectName());
                log.info("配置参数:", JSON.toJSONString(configList));
                for (String s : configList) 
                    byte[] value = curatorFramework.getData().forPath(zkProperties.getProjectName() + ZkConstant.SEPARATOR + s);
                    log.info("节点:【】   key:【】   Value:【】 ", zkProperties.getProjectName() + ZkConstant.SEPARATOR + s, s, new String(value));
                    properties.setProperty(s, new String(value));
                
    
                // 监听属性值变更
                treeCache.getListenable().addListener(new TreeCacheListener() 
                    @Override
                    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception 
                        if (Objects.equals(treeCacheEvent.getType(), TreeCacheEvent.Type.NODE_ADDED) ||
                                Objects.equals(treeCacheEvent.getType(), TreeCacheEvent.Type.NODE_UPDATED)) 
                            String updateKey = treeCacheEvent.getData().getPath().replace(zkProperties.getProjectName() + ZkConstant.SEPARATOR, "");
                            properties.setProperty(updateKey, new String(treeCacheEvent.getData().getData()));
                            log.info("数据更新:更新类型【】,更新的key:【】,更新value:【】", treeCacheEvent.getType(), zkProperties.getProjectName() + ZkConstant.SEPARATOR + updateKey, new String(treeCacheEvent.getData().getData()));
                        
                    
                );
    
             catch (Exception e) 
                log.error("zk配置初始化异常", e);
            
        
    

get/set方法


    /**
     * 获取同步配置属性
     * </p>
     * zk的value保存在本地缓存
     *
     * @param key zk-Key
     * @return String
     */
    public String getProperties(String key) 
        return properties.getProperty(key);
    

    /**
     * 获取zk的节点value
     * </>
     *
     * @return String
     */
    public String getZkValue(String key) 
        try 
            byte[] bytes = curatorFramework.getData().forPath(zkProperties.getProjectName() + ZkConstant.SEPARATOR + key);
            return new String(bytes);
         catch (Exception e) 
            return null;
        
    

    /**
     * 设置zk的key value
     * <p>
     * 如果项目启动,会同步更新本地配置缓存
     *
     * @param key
     * @param value
     * @return
     */
    public Boolean setZkValue(String key, String value) 
        try 
            String propertiesKey = zkProperties.getProjectName() + ZkConstant.SEPARATOR + key;
            Stat stat = curatorFramework.checkExists().forPath(propertiesKey);
            if (stat == null) 
                curatorFramework.create().forPath(propertiesKey);
            
            curatorFramework.setData().forPath(propertiesKey, value.getBytes());
            return Boolean.TRUE;
         catch (Exception e) 
            return Boolean.FALSE;
        
    

分布式锁

   /**
     * 分布式锁 对象
     * <p/>
     * 互斥锁
     *
     * @param path
     * @return
     */
    public InterProcessMutex getLock(String path) 
        InterProcessMutex lock = null;
        try 
            lock = new InterProcessMutex(curatorFramework, zkProperties.getProjectName() + ZkConstant.SEPARATOR + path);
            return lock;
         catch (Exception e) 
            log.error("获取锁失败", e);
        
        return null;
    

实现结果

  • 项目启动初始化
  • 在zk中改个配置
  • 结果
  • 在线获取配置
  @RequestMapping("/get/key")
    public String get(@PathVariable("key") String key) 
        log.info("请求参数:", key);
        return "结果:" + zkApi.getProperties(key);
    

以上是关于CuratorFramework实现zk同步本地配置的主要内容,如果未能解决你的问题,请参考以下文章

CuratorFramework实现zk同步本地配置

CuratorFramework实现zk同步本地配置

分布式协调服务中间件ZooKeeper 入门-ZK的介绍与特性

ZK节点间数据同步以及API实践

Zookeeper的基本原理(zk架构zk存储结构watch机制独立安装zk集群间同步复制)

基于CuratorFramework实现zookeeper分布式锁,实现任务争抢,程序高可用方案,单进程去争抢多个任务。在生产环境稳定运行。