Java连接Zookeeper,创建监听通知机制

Posted 永旗狍子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java连接Zookeeper,创建监听通知机制相关的知识,希望对你有一定的参考价值。

目录

一.Java连接Zookeeper

1.创建Springboot工程,导入依赖

1.1排除log4j和桥接包

2.配置application.yml

3.编写连接Zookeeper的工具类

4.监听通知机制

4.1 在zookeeper容器内部新增节点


一.Java连接Zookeeper

1.创建Springboot工程,导入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
</dependencies>

1.1排除log4j和桥接包

2.配置application.yml

zk:
  host: 1111.com
  port: 2181

3.编写连接Zookeeper的工具类

@Configuration
public class ZookeeperConfig {

    @Value("${zk.port}")
    private String port;

    @Value("${zk.host}")
    private String host;

    @Bean
    public  CuratorFramework curatorFramework() {

        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();

        builder.connectString(host+":"+port);// 连接zk的ip和端口

        // 超时时间是3s,重试的次数是2次
        builder.retryPolicy(new ExponentialBackoffRetry(3000,2));

        // 构建操作ZK的客户但
        CuratorFramework curatorFramework = builder.build();

        // 必须要调用启动方法才能使用
        curatorFramework.start();

        return curatorFramework;
    }

}

4.监听通知机制

我这里是为了方便,在测试类中写的。

@SpringBootTest
class Zk02NodeListenerApplicationTests {

    @Autowired
    private CuratorFramework zkClient;
    @Test
    void testCreateNode() throws Exception {
        zkClient.create().forPath("/listener-node","测试监听机制".getBytes("utf-8"));
        System.out.println("节点创建成功。。。");
    }

    // 写一个客户端去监听节点
    @Test
    public void listenerNode()throws Exception{
        // 1.设置被监听的节点
        TreeCache treeCache = new TreeCache(zkClient, "/listener-node");

        // 2.设置节点发送改变后的回调函数
        treeCache.getListenable().addListener(new TreeCacheListener() {
            // 节点发送改变后该方法被调用。。。
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

                // 可以根据event事件类型来判断节点发送了什么样的变化
                TreeCacheEvent.Type type = event.getType();

                // 节点的最新的数据
                ChildData data = event.getData();

                String msgStr="";
                if (data!=null) {
                    byte[] msg = data.getData();
                   msgStr = new String(msg, 0, msg.length, "utf-8");
                }

                // 根据节点不同变化,做出不同的响应
                switch(type){
                    case NODE_ADDED:
                        System.out.println("在【listener-node】节点新增了一个子节点," +
                                "子节点名称【" + data.getPath() + "】,子节点数据【" + msgStr + "】");
                        break;
                    case NODE_REMOVED:
                        System.out.println("在【listener-node】节点删除了一个子节点," +
                                "删除节点名称【" + data.getPath() + "】,子节点数据【" + msgStr + "】");
                        break;
                    case NODE_UPDATED:
                        System.out.println("在【listener-node】节点本身的数据发送改变," +
                                "更新节点名称【" + data.getPath() + "】,子节点数据【" + msgStr + "】");
                        break;

                }


            }
        });

        System.out.println("java客户端持续监听中。。。。");
        treeCache.start();

        // 既然是监听,固然这个线程不能结束的。。
        System.in.read();

    }

}

4.1 在zookeeper容器内部新增节点

 

以上是关于Java连接Zookeeper,创建监听通知机制的主要内容,如果未能解决你的问题,请参考以下文章

Java连接Zookeeper,创建监听通知机制

zookeeper学习-5Java API操作 - Watcher监听机制

Zookeeper

ZooKeeper Watcher 机制

2021年大数据ZooKeeper:ZooKeeper的shell操作

8.7.ZooKeeper Watcher监听