Zookeeper--06---Curator客户端的使⽤zk的watch机制

Posted 高高for 循环

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper--06---Curator客户端的使⽤zk的watch机制相关的知识,希望对你有一定的参考价值。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


Zookeeper客户端(zkCli)的使⽤

Zookeeper–03–常用Shell命令




Curator客户端的使⽤

Curator介绍

  • Curator是Netflix公司开源的⼀套zookeeper客户端框架,Curator是对Zookeeper⽀持最好 的客户端框架。
  • Curator封装了⼤部分Zookeeper的功能,⽐如Leader选举、分布式锁等,减少了技术⼈员在使⽤Zookeeper时的底层细节开发⼯作。

1.引⼊Curator

<!--Curator-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

<!--Zookeeper-->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.14</version>
</dependency>

application.properties配置⽂件

curator.retryCount=5
curator.elapsedTimeMs=5000
curator.connectString=172.16.253.35:2181
curator.sessionTimeoutMs=60000
curator.connectionTimeoutMs=5000

注⼊配置Bean


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZK 

  private int retryCount;

  private int elapsedTimeMs;

  private String connectString;

  private int sessionTimeoutMs;

  private int connectionTimeoutMs;


注⼊CuratorFramework

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CuratorConfig 

    @Autowired
    WrapperZK wrapperZk;

    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() 
      return CuratorFrameworkFactory.newClient(
        wrapperZk.getConnectString(),
        wrapperZk.getSessionTimeoutMs(),
        wrapperZk.getConnectionTimeoutMs(),
        new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimeMs()));
    

2.创建节点

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.logging.Logger;

@Slf4j
@SpringBootTest
class BootZkClientApplicationTests 

  @Autowired
  CuratorFramework curatorFramework;


  @Test
  void createNode() throws Exception 

    //添加持久节点
    String path = curatorFramework.create().forPath("/curator-node");
    //添加临时序号节点
    String path1 = curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());
   
    System.out.println(String.format("curator create node :%s  successfully.",path));
    System.in.read();

  



3.获得节点数据


  @Test
  public void testGetData() throws Exception 
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
  
    System.out.println(new String(bytes));
  
  

4.修改节点数据


  @Test
  public void testSetData() throws Exception 
    curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
   
    System.out.println(new String(bytes));
  

5.创建节点同时创建⽗节点

  @Test
  public void testCreateWithParent() throws Exception 
    String pathWithParent="/node-parent/sub-node-1";
    String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
    
    System.out.println(String.format("curator create node :%s  successfully.",path));
  

6.删除节点


  @Test
  public void testDelete() throws Exception 
    String pathWithParent="/node-parent";
    curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
  
  

zk的watch机制

1.Watch机制介绍

  • 我们可以把 Watch 理解成是注册在特定 Znode 上的触发器。
  • 当这个 Znode 发⽣改变,也就是调⽤了 create , delete , setData ⽅法的时候,将会触发 Znode上注册的对应事件,请求 Watch 的客户端会接收到异步通知

具体交互过程如下:

  • 客户端调⽤ getData ⽅法, watch 参数是 true 。服务端接到请求,返回节点数据,并且在对应的哈希表⾥插⼊被 Watch 的 Znode 路径,以及 Watcher 列表。
  • 当被 Watch 的 Znode 已删除,服务端会查找哈希表,找到该 Znode 对应的所有Watcher,异步通知客户端,并且删除哈希表中对应的 Key-Value。

    客户端使⽤了NIO通信模式监听服务端的调⽤

2.监听器原理

3.zkCli客户端使⽤watch

get -w /test ⼀次性监听节点

4.curator客户端使⽤watch

  @Test
  public void addNodeListener() throws Exception 

    NodeCache nodeCache = new NodeCache(curatorFramework, "/curator-node");
    nodeCache.getListenable().addListener(new NodeCacheListener() 
      @Override
      public void nodeChanged() throws Exception 
        log.info(" path nodeChanged: ","/curator-node");
        printNodeData();
      
    );

    nodeCache.start();

    System.in.read();

  

  public void printNodeData() throws Exception 
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
    log.info("data: ",new String(bytes));
  

以上是关于Zookeeper--06---Curator客户端的使⽤zk的watch机制的主要内容,如果未能解决你的问题,请参考以下文章

科普:客户关系管理系统对客户信息管理的6大帮助

java socket如何实现客户端与客户端的交互?

MQTT客户端设置

客户管理软件有效解决客户流转的问

使用旧客户数据定​​位新客户

客户端-服务器多线程聊天应用程序。客户端到客户端的通信