ZooKeeper官方文档学习笔记04-ZooKeeper的Java实例
Posted 墨_浅-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper官方文档学习笔记04-ZooKeeper的Java实例相关的知识,希望对你有一定的参考价值。
碎碎念:启动成功了一半。可以启动,可以debug,但是有些方法无法访问,而且create在哪里,我还不清楚。那个DataMonitor,不能完全按照官网写,要像我一样改一下,不然会报werror,因为有些过时了
ZooKeeper的Java实例
一个简单的watch客户端
作用:监视ZooKeeper节点的更改,对程序的启动或停止做出响应。
要求
- 1 它需要四个参数:
zk服务器的地址
被监视节点的名字
输出要写入的文件名
带参数的可执行文件
是这样理解吗?
- 2 获取与znode关联的数据并启动可执行文件
- 3 若被监视的znode发生更改,客户机将重新获取内容并重启启动可执行程序
- 4 若被监视的znode消失,客户端将杀死可执行程序
程序设计
ZooKeeper应用程序分为两部分:维护与服务器连接和监视节点数据。Executor
类负责维护连接部分,DataMonitor
负责监视zookeeper树中的数据。Executor包含主线程和执行逻辑。它负责少量的用户交互,以及与可执行程序(根据被监视的znode节点的状态停止或重启)的交互。
Executor
Executor对象是这个简单示例中的基本容器。它包含了ZooKeeper对象和DataMonitor。
// from the Executor class...
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
//Executor的任务是根据命令行中输入去启动和停止的可执行程序,以响应zk对象触发的时间。(前面的args)
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
Executor实现了这些
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...
//DataMonitor.DataMonitorListener这个是啥?
ZooKeeper的java api定义了watcher 接口,zk用watcher与容器(如Executor)进行通信。watcher仅包含process()方法。zk利用它去传递主线程感兴趣的时间,例如zk连接或会话的状态。本例中的Executor仅将事件下发给DataMonitor ,然后由DataMonitor 决定如何处理。
> 就说Executor接收了,但不想做就交给DataMonitor 处理了?是不是也可以交由其他的呢?
```java
public void process(WatchedEvent event) {
dm.process(event);
}
下面的DataMonitorListener接口是本案例设计的,不属于zkAPI。DataMonitor 对象使用它来与其容器(如Executor)进行通信(DataMonitor.DataMonitorListener)。
public interface DataMonitorListener {
void exists(byte data[]);
/**
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
DataMonitor.DataMonitorListener这个是啥?
就是DataMonitor里面定义了DataMonitorListener接口,并由Executor实现了。
package example;
import org.apache.zookeeper.WatchedEvent;
public class DataMonitor {
DataMonitor dm;
public void process(WatchedEvent event) {
dm.process(event);
}
public interface DataMonitorListener {
/**
* 节点存活与否判断
*/
void exists(byte data[]);
/**
* ZooKeeper会话失效
* @param src
* ZooKeeper的原因码(reason code)
*/
void closing(int src);
}
}
下面是Executor 对 DataMonitorListener.exists ()和 DataMonitorListener.closing 的实现:
```java
public void exists( byte[] data ) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
DataMonitor
DataMonitor是本程序ZooKeeper逻辑的核心,它是异步和事件驱动的。
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
//检查znode是否存在,并设置监视。
//传递自身作为回调对象,watcher触发时就会引起真实的处理流程
//exists在服务器端完成,但其回调在客户端完成
zk.exists(znode, true, this, null);
Note
- 1 不要将完成回调和watch的回调搞混。exists()的完成回调——(客户机端)processResult ()是在服务器上的watch(ZooKeeper.exists()的)操作之后执行。
- 2 Executor注册为了zk对象的一个watcher,所以watch触发时会向Executor对象发送一个事件
- 3 zk3.0后DataMonitor也可以注册为特定事件的watcher,本例不支持。
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
//1 检查节点是否存在
switch (rc) {
//节点存在
case Code.OK:
exists = true;
break;
//节点不存在
case Code.NoNode:
exists = false;
break;
//会话被服务器终止(致命错误)
case Code.SESSIONEXPIRED:
//未认证(致命错误)
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
//可恢复的服务
zk.exists(znode,true,this,null);
return;
}
byte b[] = null;
// 2 存在则从znode获取数据
if (exists) {
try{
//这两句不是太理解欸
//如果节点在调用zookeeper.getData之前被删除,
//zookeeper.exists()设置的watch将会触发一个回调
//如果由通信错误,连接的watch事件会在连接恢复时触发
b = zk.getData(znode, false,null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
//如果状态发生变化,调用Executor 的 exists() 回调函数
//???这里不是太理解
if ((b == null && b!= prevData)
|| b != null && !Arrays.equals(prevData, b)) {
listener.exists(b);
prevData = b;
}
}
完整代码
Executor
package example;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
String filename;
String exec[];
ZooKeeper zk;
DataMonitor dm;
Process child;
public static void main(String[] args) {
if (args.length < 4) {
//(标准错误输出流)实时输出错误,显示为红色。out要累计到一定程度才输出
//https://blog.csdn.net/weixin_42153410/article/details/94618943
System.err.
println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
/**
* Object src : 原数组
* int srcPos : 从元数据的起始位置开始
* Object dest : 目标数组
* int destPos : 目标数组的开始起始位置
* int length : 要copy的数组的长度
*/
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
//public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)这个?
//this是Executor对自身的引用
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
public void run() {
try {
//同一时间只能有一个线程访问
synchronized (this) {
while (!dm.dead) {
wait();
}
}
}catch (InterruptedException e) {
}
}
public void exists(byte[] data) {
//没有传数据
if (data == null) {
//如果进程不为空
if (child != null) {
System.out.println("Killing process");
//杀死子进程
child.destroy();
try {
//让线程等待到终止为止
child.waitFor();
} catch (InterruptedException e) {
//线程在等待时中断
e.printStackTrace();
}
}
//进程置空
child = null;
} else {
//data有数据(znode存在,或发生变化?)
if (child != null) {
//但是进程不为空
System.out.println("Stopping child");
//先终止进程
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//将znode数据存入文件
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
//启动进程
System.out.println("Starting child");
//getRuntime 返回与当前Java应用程序关联的运行时对象(Runtime)
//exec 在单独的进程中执行指定的字符串命令
// 即,线程执行用户的命令
child = Runtime.getRuntime().exec(exec);
//两个线程输出执行结果及日志
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void closing(int rc) {
synchronized (this) {
//唤醒对象的等待池中的所有线程,进入锁池
// 和会话失效有啥关系?
notifyAll();
}
}
static class StreamWriter extends Thread {
OutputStream os;
InputStream is;
StreamWriter(InputStream is,OutputStream os) {
this.is = is;
this.os = os;
start();
}
public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b,0,rc);
}
} catch (IOException e) {}
}
}
public void process(WatchedEvent event) {
dm.process(event);
}
}
DataMonitor
package example;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException.Code;
import java.util.Arrays;
public class DataMonitor implements Watcher以上是关于ZooKeeper官方文档学习笔记04-ZooKeeper的Java实例的主要内容,如果未能解决你的问题,请参考以下文章