Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)

Posted CodeJiao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)相关的知识,希望对你有一定的参考价值。

文章目录

本节案例承接上节案例

1. Zookeeper – 初识Zookeeper、Zookeeper的安装和配置、Zookeeper命令操作(Zookeeper数据模型 Zookeeper服务端 / 客户端常用命令)

2. Zookeeper – Zookeeper JavaAPI相关操作(Curator介绍、Curator API 常用操作(节点的CRUD,Watch事件监听)、分布式锁、模拟12306售票案例)

3. Zookeeper – Zookeeper 集群搭建 集群角色说明

1. Curator介绍

  • Curator 是 Apache ZooKeeper 的Java客户端库。
  • 常见的ZooKeeper Java API :
    • 原生Java API
    • ZkClient
    • Curator
  • Curator 项目的目标是简化 ZooKeeper 客户端的使用。
  • Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。
  • 官网:http://curator.apache.org/

2. Curator API 常用操作

我们先创建一个名为curator-zk的普通maven工程。

然后导入相关的依赖:pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tian</groupId>
    <artifactId>curator-zk</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>

        <!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>

</project>


配置日志信息打印:log4j.properties

log4j.rootLogger=off,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%dyyyy-MM-dd HH/:mm/:ss]%-5p %c(line/:%L) %x-%m%n

新建一个Curator测试类:

package com.tian;

public class CuratorTest 

下面的这些方法都是写在这个测试类里面的。


2.1 建立连接


2.1.1 连接方式1

    @Test
    public void testConnect1() 
        CuratorFrameworkFactory.newClient("192.168.135.130:2181", 60 * 1000, 15 * 1000, new ExponentialBackoffRetry(300, 10)).start();
    

参数说明:

运行结果:


2.1.2 连接方式2(链式编程)

    @Test
    public void testConnect2() 
        CuratorFrameworkFactory
                .builder()
                .connectString("192.168.135.130:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(new ExponentialBackoffRetry(300, 10))
                .build()
                .start();
    

运行结果:


2.2 添加节点

在此之前,我们先做一点点改造:

package com.tian;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorTest 
    private CuratorFramework client;

    @Test
    public void testConnect1() 
        CuratorFrameworkFactory.newClient("192.168.135.130:2181", 60 * 1000, 15 * 1000, new ExponentialBackoffRetry(300, 10)).start();
    

    @Before
    public void testConnect2() 
        client = CuratorFrameworkFactory
                .builder()
                .connectString("192.168.135.130:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(new ExponentialBackoffRetry(300, 10))
                .build();
                
        client.start();
    

    @After
    public void closeConnection() 
        client.close();
    



2.2.1 基本创建

    @Test
    public void basicCreate() throws Exception 
        // 1. 基本创建
        // 如果创建节点没有指定数据, 则默认将当前客户端的ip作为数据存储
        client.create().forPath("/app1");
    

运行结果:


2.2.1 创建节点:带有数据

    @Test
    public void createWithData() throws Exception 
    // 2. 创建节点:带有数据
        client.create().forPath("/app2", "CodeJiao".getBytes(StandardCharsets.UTF_8));
    

运行结果:



2.2.3 创建节点:设置节点类型

    @Test
    public void createWithType() throws Exception 
        // 3. 设置类型, 默认类型, 持久化
        // 现在来创建一个临时节点 /app3
        // CreateMode是一个枚举类型
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
    

运行结果:


但是这里却看不见/app3节点,因为临时节点只在当前会话有效。Java方法执行完毕后,当前会话就结束了。


2.2.4 创建多级节点

    @Test
    public void createMultiLevelNode() throws Exception 
        // 3. 创建多级节点 creatingParentsIfNeeded() 表示如果没有父节点 则自动创建父节点.
        client.create().creatingParentsIfNeeded().forPath("/app4/p1");
    

运行结果:



2.2.5 小结


2.3 删除节点


2.3.1 删除单个节点(无子节点)

    @Test
    public void testDelEmptyNode() throws Exception 
        client.delete().forPath("/app1");
    

2.3.2 删除带有子节点的节点(任何情况都可以删除:包括有子结点也可以删除成功)

    @Test
    public void testDelNode() throws Exception 
        client.delete().deletingChildrenIfNeeded().forPath("/app1");
    

2.3.3 必须成功的删除

    @Test
    public void testDelNode() throws Exception 
        client.delete().guaranteed().forPath("/app1");
    

2.3.4 回调(删完之后需要做的事情)

    @Test
    public void testWithCallback() throws Exception 
        client.delete().guaranteed().inBackground(new BackgroundCallback() 
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception 
                System.out.println("/app1节点被删除了");
                System.out.println(curatorFramework.toString());
                System.out.println(curatorEvent);
            
        ).forPath("/app2");
    

运行结果:


2.4 修改节点


2.4.1 修改数据

    @Test
    public void modifyData() throws Exception 
        client.setData().forPath("/app1", "数据被修改了".getBytes(StandardCharsets.UTF_8));
    

运行结果:


运行结果:


2.4.2 根据版本去修改数据(保证不被其他线程或操作干扰)

    @Test
    public void modifyDataByVersion() throws Exception 
        // 查询版本信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/app1");
        int version = stat.getVersion();
        System.out.println(version);

        // 根据版本修改数据:如果版本匹配, 则会报错
        client.setData().withVersion(version).forPath("/app1", "根据版本修改数据".getBytes(StandardCharsets.UTF_8));
    

2.5 查询节点


2.5.1 查询数据

    @Test
    public void queryData() throws Exception 
        // 1. 查询数据
        byte[] bytes = client.getData().forPath("/app1");
        System.out.println(new String(bytes));
    

运行结果:


2.5.2 查询子节点

    @Test
    public void queryChildNode() throws Exception 
        // 2. 查询子结点
        List<String> paths = client.getChildren().forPath("/");
        
        for (String path : paths) 
            System.out.println(path);
        
    

运行结果:


2.5.3 查询节点状态信息

    @Test
    public void queryNodeStatusInfo() throws Exception 
        Stat stat = new Stat();
        System.out.println(stat);

        // 3.查询节点状态信息
        byte[] bytes = client.getData().storingStatIn(stat).forPath("/app1");
        
        System.out.println(stat);
    

运行结果:


2.6 Watch事件监听

  • ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
  • ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
  • ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便 需要开发人员自己反复注册Watcher,比较繁琐。
  • Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。 ZooKeeper提供了三种Watcher:
    • NodeCache : 只是监听某一个特定的节点
    • PathChildrenCache : 监控一个ZNode的子节点.
    • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

2.6.1 NodeCache (监听某一个特定的节点 )

 @Test
    public void testNodeCache() throws Exception 
        // 1. 创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client, "/app1");

        // 2. 注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() 
            @Override
            public void nodeChanged() throws Exception 
                System.out.println("节点信息发生变化");
                System.out.println("更新后的数据为: " + new String(nodeCache.getCurrentData().getData()));

            
        );

        // 3. 开启监听。true代表一次性把缓存的节点数据加载过来.
        nodeCache.start(true);


        while (true) 
            // 避免一直空转
            Thread.sleep(500);
        
    

运行结果:




2.6.2 PathChildrenCache (监听某一节点的子节点的变化 但是不可以监听自己)

    @Test
    public void testPathChildCache() throws Exception 
        // true 代表加载缓存数据
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app1", true);
        // 绑定监听器
        pathChildrenCache.getListenable(以上是关于Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)的主要内容,如果未能解决你的问题,请参考以下文章

ZooKeeper的使用---Java程序

zookeeper支持c++吗

Zookeeper集群

Zookeeper -- 初识ZookeeperZookeeper的安装和配置Zookeeper命令操作(Zookeeper数据模型 Zookeeper服务端 / 客户端常用命令)

ZooKeeper集群

ZooKeeper集群