大数据讲课笔记6.6 ZooKeeper的Java API操作
Posted howard2005
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据讲课笔记6.6 ZooKeeper的Java API操作相关的知识,希望对你有一定的参考价值。
文章目录
- 零、学习目标
- 一、导入新课
- 二、新课讲解
- (一)ZooKeeper Java API概述
- (二)通过Java API操作ZooKeeper
- 1、创建Maven项目
- 2、添加相关依赖
- 3、创建类实现功能
- (1)测试连接zk服务器,编写testConnect()方法
- (2)测试创建节点,编写testCreate()方法
- (3)测试获取节点数据和信息,编写testGet()方法
- (4)测试修改节点数据,编写testSet()方法
- (5)测试获取子结点,编写testGetChildren()方法
- (6)测试删除节点,编写testDelete()方法
- (7)监测节点数据是否发生变化,编写testGetDataWatcher()方法
- (8)监测子节点是否发生变化,编写testGetChildrenWatcher()方法
- (9)监测节点是否被删除,编写testDeleteWatcher()方法
- (10)测试节点创建回调,编写testCreateCallback()方法
- 4、ZkDemo完整代码
- 三、归纳总结
- 四、上机操作
零、学习目标
- 了解ZooKeeper Java API
- 能利用Java API操作ZooKeeper
一、导入新课
- 通过上节课的学习,我们掌握了ZooKeeper的Shell操作。本节课将针对ZooKeeper的Java API操作进行详细讲解。
二、新课讲解
(一)ZooKeeper Java API概述
1、ZooKeeper Java API包含五个包
org.apache.ZooKeeper
org.apache.ZooKeeper.data
org.apache.ZooKeeper.server
org.apache.ZooKeeper.server.quorum
org.apache.ZooKeeper.server.upgrade
2、ZooKeeper类常用方法
- org.apache.ZooKeeper包含ZooKeeper类,这也是编程时最常用的类文件,ZooKeeper类提供的常用Java API方法。
方法名称 | 方法描述 |
---|---|
create | 创建节点 |
delete | 删除节点 |
exists | 判断节点是否存在 |
get/setData | 获取/修改节点数据 |
getChildren | 获取指定节点下的所有子节点列表 |
(二)通过Java API操作ZooKeeper
1、创建Maven项目
- 创建Maven项目 -
ZooKeeperDemo
2、添加相关依赖
- 在
pom.xml
文件里添加ZooKeeper和junit依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ZooKeeperDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>
</project>
3、创建类实现功能
- 在
net.hw.zk
包里创建ZkDemo
类
(1)测试连接zk服务器,编写testConnect()方法
ZooKeeper类的构造方法:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
@Test
public void testConnect() throws Exception
// 创建闭锁
final CountDownLatch cdl = new CountDownLatch(1);
// 创建ZooKeeper对象
ZooKeeper zk = new ZooKeeper(
"192.168.225.100:2181", // 参数1:连接字符串(IP地址:端口号)
3000, // 参数2:会话超时
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("成功连接zk服务器!");
cdl.countDown();
// 参数3:监听器
);
// 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
cdl.await();
说明:ZooKeeper构造方法第一个参数是zookeeper服务器地址及端口号,可以用IP地址:192.168.225.100,也可用主机名,比如tiger,但是必须在Windows的hosts文件里定义域名映射:
运行程序,结果如下:
为了去掉log4j的警告信息,在resources目录里创建log4j.properties文件:
log4j.rootLogger=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d - %m%n
将IP地址改成主机名tiger,运行结果是一样的。
(2)测试创建节点,编写testCreate()方法
ZooKeeper类的create方法:
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
- 参数1:节点路径
- 参数2:节点数据(字节数组)
- 参数3:访问控制列表
- 参数4:创建模式(四种:临时普通、临时顺序、持久普通、持久顺序)
@Test
public void testCreate() throws Exception
// 创建闭锁
final CountDownLatch cdl = new CountDownLatch(1);
// 创建ZooKeeper对象
ZooKeeper zk = new ZooKeeper(
"tiger:2181", // 参数1:连接字符串(主机名:端口号)
3000, // 参数2:会话超时
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("成功连接zk服务器!");
cdl.countDown();
// 参数3:监听器
);
// 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
cdl.await();
// 创建节点
try
zk.create("/zk01", // 参数1:节点路径
"flower".getBytes(), // 参数2:节点数据
ZooDefs.Ids.OPEN_ACL_UNSAFE,// 参数3:访问控制列表
CreateMode.EPHEMERAL // 参数4:创建模式(普通临时)
);
System.out.println("创建节点/zk01成功~");
catch (Exception e)
System.out.println("创建节点/zk01失败~");
运行程序,结果如下:
我们打开zk客户端,查看是否有/zk01
节点:
咦,怎么没有我们刚才创建的/zk01
节点呢?
看见没,我们在程序里创建的是临时节点,一旦程序与zk服务器断开连接,临时节点就灰飞烟灭了,利用这个特性,zk服务器可以了解一个服务器集群里哪些节点宕机了。
我们不妨修改一下程序,让线程睡眠30秒:
此时,运行程序,在zk客户端查看:
看见没,节点/zk01
出现了。
但是,等到线程结束,再次查看:
节点/zk01
又被自动被删除掉。
当然,可以创建其它类型的节点,比如持久节点:
运行程序,结果如下:
等待线程睡眠结束:
此时,进程已经结束了,但是我们在zk客户端看看/zk01节点是否还存在。
(3)测试获取节点数据和信息,编写testGet()方法
zk服务器上有一个我们刚才创建的普通持久节点:/zk01
ZooKeeper类的getData()方法:
public byte[] getData(final String path, Watcher watcher, Stat stat)
- 参数1:节点路径
- 参数2:监听器
- 参数3:状态(封装了节点相关信息)
返回值是节点数据,是一个字节数组。
@Test
public void testGet() throws Exception
// 创建闭锁
final CountDownLatch cdl = new CountDownLatch(1);
// 创建ZooKeeper对象
ZooKeeper zk = new ZooKeeper(
"tiger:2181", // 参数1:连接字符串(主机名:端口号)
3000, // 参数2:会话超时
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("成功连接zk服务器!");
cdl.countDown();
// 参数3:监听器
);
// 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
cdl.await();
// 获取节点数据以及相关信息
Stat stat = new Stat();
byte[] data = zk.getData("/zk01", // 参数1:节点路径
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("获取了节点/zk01的数据!");
, // 参数2:监听器
stat // 参数3:状态(封装了节点相关信息)
);
// 输出节点/zk01的数据
System.out.println("节点/zk01的数据:" + new String(data));
// 输出节点/zk01的相关信息
System.out.println("节点/zk01的状态:" + stat);
运行程序,结果如下:
(4)测试修改节点数据,编写testSet()方法
ZooKeeper类的setData()方法:
public Stat setData(final String path, byte data[], int version)
- 参数1:节点路径
- 参数2:节点数据
- 参数3:节点数据版本号
@Test
public void testSet() throws Exception
// 创建闭锁
final CountDownLatch cdl = new CountDownLatch(1);
// 创建ZooKeeper对象
ZooKeeper zk = new ZooKeeper(
"tiger:2181", // 参数1:连接字符串(主机名:端口号)
3000, // 参数2:会话超时
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("成功连接zk服务器!");
cdl.countDown();
// 参数3:监听器
);
// 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
cdl.await();
// 设置节点数据
try
zk.setData("/zk01", "plant".getBytes(), -1);
System.out.println("修改节点/zk01数据成功~");
catch (Exception e)
System.out.println("修改节点/zk01数据失败~");
在zk客户端去看看,是不是将/zk01的数据修改成了“plant”呢?
(5)测试获取子结点,编写testGetChildren()方法
在zk客户端创建/zk01节点的两个子节点:zk01/zk11
、zk01/zk12
。
ZooKeeper类的getChildren()方法:
public List<String> getChildren(final String path, Watcher watcher)
- 参数1:节点路径
- 参数2:监听器
@Test
public void testGetChildren() throws Exception
// 创建闭锁
final CountDownLatch cdl = new CountDownLatch(1);
// 创建ZooKeeper对象
ZooKeeper zk = new ZooKeeper(
"tiger:2181", // 参数1:连接字符串(主机名:端口号)
3000, // 参数2:会话超时
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("成功连接zk服务器!");
cdl.countDown();
// 参数3:监听器
);
// 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
cdl.await();
// 获取/zk01的子节点
List<String> paths = zk.getChildren("/zk01", new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("成功获取节点/zk01的子节点~");
);
// 遍历子节点
for (String path : paths)
System.out.println("节点/zk01/" + path + "数据:"
+ new String(zk.getData("/zk01/" + path, null, null)));
运行程序,结果如下:
(6)测试删除节点,编写testDelete()方法
ZooKeeper类的delete()方法:
public void delete(final String path, int version)
- 参数1:节点路径
- 参数2:节点数据版本号
@Test
public void testDelete() throws Exception
// 创建闭锁
final CountDownLatch cdl = new CountDownLatch(1);
// 创建ZooKeeper对象
ZooKeeper zk = new ZooKeeper(
"tiger:2181", // 参数1:连接字符串(主机名:端口号)
3000, // 参数2:会话超时
new Watcher()
@Override
public void process(WatchedEvent event)
System.out.println("成功连接zk服务器!");
cdl.countDown();
// 参数3:监听器
);
// 利用闭锁的await()方法产生阻塞,等待连接zk的线程执行完毕
cdl.await();
// 删除节点/zk01
try
zk.delete("/zk01", -1);
System.out.println("节点/zk01删除成功~");
catch (Exception e)
e.printStackTrace();
运行程序,结果如下:
注意:节点/zk01
非空,删除不了,所以要报异常。
在zk客户端将/zk01的两个字节点zk11和zk12删除:
在zk客户端创建节点/zk01
:
(7)监测节点数据是否发生变化,编写testGetDataWatcher()方法
@Test
public void testGetDataWatcher() throws Exception
final CountDownLatch cdl = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("tiger:2181",
3000, new Watcher()
@Override
public void process(WatchedEvent watchedEvent)
System.out.println("成功连接zk服务器!");
cdl.countDown();
);
cdl.await();
while (true)
final CountDownLatch cdl2 = new CountDownLatch(1);
zk.getData("/zk01", new Watcher()
@Override
public void process(WatchedEvent watchedEvent)
System.out.println("节点/zk01数据发生变化!");
cdl2.countDown();
, null);
cdl2.await();
Thread.sleep(1000);
运行程序,结果如下:
程序在循环中,监测着节点/zk01数据是否有变化。
好了,我们到zk客户端,修改zk01的数据:
切换到控制台查看:
确实,能够利用Watcher对象监测到节点数据是否发生变化。、
(8)监测子节点是否发生变化,编写testGetChildrenWatcher()方法
监听事件类型:
- NodeChildrenChanged
- NodeCreated
- NodeDataChanged
- NodeDeleted
@Test
public void testGetChildrenWatcher() throws Exception
final CountDownLatch cdl 大数据讲课笔记6.1 初识ZooKeeper