大数据讲课笔记6.6 ZooKeeper的Java API操作

Posted howard2005

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据讲课笔记6.6 ZooKeeper的Java API操作相关的知识,希望对你有一定的参考价值。

文章目录

零、学习目标

  1. 了解ZooKeeper Java API
  2. 能利用Java API操作ZooKeeper

一、导入新课

  • 通过上节课的学习,我们掌握了ZooKeeper的Shell操作。本节课将针对ZooKeeper的Java API操作进行详细讲解。

二、新课讲解

(一)ZooKeeper Java API概述

1、ZooKeeper Java API包含五个包

org.apache.ZooKeeper
org.apache.ZooKeeper.data
org.apache.ZooKeeper.server
org.apache.ZooKeeper.server.quorum
org.apache.ZooKeeper.server.upgrade

2、ZooKeeper类常用方法

  • org.apache.ZooKeeper包含ZooKeeper类,这也是编程时最常用的类文件,ZooKeeper类提供的常用Java API方法。
方法名称方法描述
create创建节点
delete删除节点
exists判断节点是否存在
get/setData获取/修改节点数据
getChildren获取指定节点下的所有子节点列表

(二)通过Java API操作ZooKeeper

1、创建Maven项目

  • 创建Maven项目 - ZooKeeperDemo

2、添加相关依赖

  • pom.xml文件里添加ZooKeeper和junit依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ZooKeeperDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.6</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
        </dependency>
    </dependencies>
</project>

3、创建类实现功能

  • net.hw.zk包里创建ZkDemo

(1)测试连接zk服务器,编写testConnect()方法

ZooKeeper类的构造方法:

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

@Test
public void testConnect() throws Exception 
    // 创建闭锁
    final CountDownLatch cdl = new CountDownLatch(1);
    // 创建ZooKeeper对象
    ZooKeeper zk = new ZooKeeper(
                "192.168.225.100:2181", // 参数1:连接字符串(IP地址:端口号)
         3000, // 参数2:会话超时
         new Watcher() 
                @Override
                public void process(WatchedEvent event) 
                    System.out.println("成功连接zk服务器!");
                    cdl.countDown();
                
             // 参数3:监听器
    );
    // 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
    cdl.await();

说明:ZooKeeper构造方法第一个参数是zookeeper服务器地址及端口号,可以用IP地址:192.168.225.100,也可用主机名,比如tiger,但是必须在Windows的hosts文件里定义域名映射:

运行程序,结果如下:

为了去掉log4j的警告信息,在resources目录里创建log4j.properties文件:

log4j.rootLogger=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d - %m%n

将IP地址改成主机名tiger,运行结果是一样的。

(2)测试创建节点,编写testCreate()方法

ZooKeeper类的create方法:

 public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  • 参数1:节点路径
  • 参数2:节点数据(字节数组)
  • 参数3:访问控制列表
  • 参数4:创建模式(四种:临时普通、临时顺序、持久普通、持久顺序)

@Test
public void testCreate() throws Exception 
    // 创建闭锁
    final CountDownLatch cdl = new CountDownLatch(1);
    // 创建ZooKeeper对象
    ZooKeeper zk = new ZooKeeper(
            "tiger:2181", // 参数1:连接字符串(主机名:端口号)
            3000, // 参数2:会话超时
            new Watcher() 
                @Override
                public void process(WatchedEvent event) 
                    System.out.println("成功连接zk服务器!");
                    cdl.countDown();
                
             // 参数3:监听器
    );
    // 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
    cdl.await();

    // 创建节点
    try 
        zk.create("/zk01", // 参数1:节点路径
                "flower".getBytes(), // 参数2:节点数据 
                ZooDefs.Ids.OPEN_ACL_UNSAFE,// 参数3:访问控制列表
                CreateMode.EPHEMERAL // 参数4:创建模式(普通临时)
        );
        System.out.println("创建节点/zk01成功~");
     catch (Exception e) 
        System.out.println("创建节点/zk01失败~");
     

运行程序,结果如下:

我们打开zk客户端,查看是否有/zk01节点:

咦,怎么没有我们刚才创建的/zk01节点呢?

看见没,我们在程序里创建的是临时节点,一旦程序与zk服务器断开连接,临时节点就灰飞烟灭了,利用这个特性,zk服务器可以了解一个服务器集群里哪些节点宕机了。

我们不妨修改一下程序,让线程睡眠30秒:

此时,运行程序,在zk客户端查看:

看见没,节点/zk01出现了。
但是,等到线程结束,再次查看:

节点/zk01又被自动被删除掉。

当然,可以创建其它类型的节点,比如持久节点:

运行程序,结果如下:

等待线程睡眠结束:

此时,进程已经结束了,但是我们在zk客户端看看/zk01节点是否还存在。

(3)测试获取节点数据和信息,编写testGet()方法

zk服务器上有一个我们刚才创建的普通持久节点:/zk01

ZooKeeper类的getData()方法:

public byte[] getData(final String path, Watcher watcher, Stat stat)
  • 参数1:节点路径
  • 参数2:监听器
  • 参数3:状态(封装了节点相关信息)

返回值是节点数据,是一个字节数组。

@Test
public void testGet() throws Exception 
    // 创建闭锁
    final CountDownLatch cdl = new CountDownLatch(1);
    // 创建ZooKeeper对象
    ZooKeeper zk = new ZooKeeper(
            "tiger:2181", // 参数1:连接字符串(主机名:端口号)
            3000, // 参数2:会话超时
            new Watcher() 
                @Override
                public void process(WatchedEvent event) 
                    System.out.println("成功连接zk服务器!");
                    cdl.countDown();
                
             // 参数3:监听器
    );
    // 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
    cdl.await();

    // 获取节点数据以及相关信息
    Stat stat = new Stat();
    byte[] data = zk.getData("/zk01", // 参数1:节点路径
            new Watcher() 
                @Override
                public void process(WatchedEvent event) 
                    System.out.println("获取了节点/zk01的数据!");
                
            , // 参数2:监听器
        stat // 参数3:状态(封装了节点相关信息)
    );
    // 输出节点/zk01的数据
    System.out.println("节点/zk01的数据:" + new String(data));
    // 输出节点/zk01的相关信息
    System.out.println("节点/zk01的状态:" + stat);

运行程序,结果如下:

(4)测试修改节点数据,编写testSet()方法

ZooKeeper类的setData()方法:

public Stat setData(final String path, byte data[], int version)
  • 参数1:节点路径
  • 参数2:节点数据
  • 参数3:节点数据版本号
@Test
public void testSet() throws Exception 
    // 创建闭锁
    final CountDownLatch cdl = new CountDownLatch(1);
    // 创建ZooKeeper对象
    ZooKeeper zk = new ZooKeeper(
            "tiger:2181", // 参数1:连接字符串(主机名:端口号)
            3000, // 参数2:会话超时
            new Watcher() 
                @Override
                public void process(WatchedEvent event) 
                    System.out.println("成功连接zk服务器!");
                    cdl.countDown();
                
             // 参数3:监听器
    );
    // 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
    cdl.await();

    // 设置节点数据
    try 
        zk.setData("/zk01", "plant".getBytes(), -1);
        System.out.println("修改节点/zk01数据成功~");
     catch (Exception e) 
        System.out.println("修改节点/zk01数据失败~");
     


在zk客户端去看看,是不是将/zk01的数据修改成了“plant”呢?

(5)测试获取子结点,编写testGetChildren()方法

在zk客户端创建/zk01节点的两个子节点:zk01/zk11zk01/zk12

ZooKeeper类的getChildren()方法:

public List<String> getChildren(final String path, Watcher watcher)
  • 参数1:节点路径
  • 参数2:监听器
@Test
public void testGetChildren() throws Exception 
    // 创建闭锁
    final CountDownLatch cdl = new CountDownLatch(1);
    // 创建ZooKeeper对象
    ZooKeeper zk = new ZooKeeper(
            "tiger:2181", // 参数1:连接字符串(主机名:端口号)
            3000, // 参数2:会话超时
            new Watcher() 
                @Override
                public void process(WatchedEvent event) 
                    System.out.println("成功连接zk服务器!");
                    cdl.countDown();
                
             // 参数3:监听器
    );
    // 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
    cdl.await();

    // 获取/zk01的子节点
    List<String> paths = zk.getChildren("/zk01", new Watcher() 
        @Override
        public void process(WatchedEvent event) 
            System.out.println("成功获取节点/zk01的子节点~");
        
    );
    // 遍历子节点
    for (String path : paths) 
        System.out.println("节点/zk01/" + path + "数据:" 
                + new String(zk.getData("/zk01/" + path, null, null)));
    

运行程序,结果如下:

(6)测试删除节点,编写testDelete()方法

ZooKeeper类的delete()方法:

public void delete(final String path, int version)
  • 参数1:节点路径
  • 参数2:节点数据版本号
@Test
public void testDelete() throws Exception 
    // 创建闭锁
    final CountDownLatch cdl = new CountDownLatch(1);
    // 创建ZooKeeper对象
    ZooKeeper zk = new ZooKeeper(
            "tiger:2181", // 参数1:连接字符串(主机名:端口号)
            3000, // 参数2:会话超时
            new Watcher() 
                @Override
                public void process(WatchedEvent event) 
                    System.out.println("成功连接zk服务器!");
                    cdl.countDown();
                
             // 参数3:监听器
    );
    // 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
    cdl.await();

    // 删除节点/zk01
    try 
        zk.delete("/zk01", -1);
        System.out.println("节点/zk01删除成功~");
     catch (Exception e) 
        e.printStackTrace();
     

运行程序,结果如下:

注意:节点/zk01非空,删除不了,所以要报异常。
在zk客户端将/zk01的两个字节点zk11和zk12删除:


在zk客户端创建节点/zk01

(7)监测节点数据是否发生变化,编写testGetDataWatcher()方法

@Test
public void testGetDataWatcher() throws Exception 
    final CountDownLatch cdl = new CountDownLatch(1);
    ZooKeeper zk = new ZooKeeper("tiger:2181",
            3000, new Watcher() 
        @Override
        public void process(WatchedEvent watchedEvent) 
            System.out.println("成功连接zk服务器!");
            cdl.countDown();
        
    );
    cdl.await();

    while (true) 
        final CountDownLatch cdl2 = new CountDownLatch(1);
        zk.getData("/zk01", new Watcher() 
            @Override
            public void process(WatchedEvent watchedEvent) 
                System.out.println("节点/zk01数据发生变化!");
                cdl2.countDown();
            
        , null);
        cdl2.await();
        Thread.sleep(1000);
    

运行程序,结果如下:

程序在循环中,监测着节点/zk01数据是否有变化。

好了,我们到zk客户端,修改zk01的数据:

切换到控制台查看:

确实,能够利用Watcher对象监测到节点数据是否发生变化。、

(8)监测子节点是否发生变化,编写testGetChildrenWatcher()方法

监听事件类型:

  • NodeChildrenChanged
  • NodeCreated
  • NodeDataChanged
  • NodeDeleted

@Test
public void testGetChildrenWatcher() throws Exception 
    final CountDownLatch cdl 大数据讲课笔记6.1 初识ZooKeeper

大数据讲课笔记6.4 ZooKeeper分布式集群部署

大数据讲课笔记6.4 ZooKeeper分布式集群部署

大数据讲课笔记6.2 ZooKeeper数据模型

大数据讲课笔记6.5 ZooKeeper的Shell操作

大数据讲课笔记6.5 ZooKeeper的Shell操作