Apache ZooKeeper Watcher机制源码解释
Posted 麦克叔叔每晚10点说
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache ZooKeeper Watcher机制源码解释相关的知识,希望对你有一定的参考价值。
分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程。不同节点上的进程互相协调行为的过程叫做分布式同步。许多分布式系统需要一个进程作为任务的协调者,执行一些其他进程并不执行的特殊的操作,一般情况下哪个进程担当任务的协调者都无所谓,但是必须有一个进程作为协调者,自动选举出一个协调者的过程就是分布式选举。ZooKeeper正是为了解决这一系列问题而生的。上一篇我们介绍了ZooKeeper服务启动原理和源代码剖析,这一讲我们来谈谈Watcher机制,首先介绍一个监控示例,然后我们再来聊聊Watcher机制原理。
ZooKeeper Watcher机制
集群状态监控示例
为了确保集群能够正常运行,ZooKeeper可以被用来监视集群状态,这样就可以提供集群高可用性。使用ZooKeeper的瞬时(ephemeral)节点概念可以设计一个集群机器状态检测机制:
每一个运行了ZooKeeper客户端的生产环境机器都是一个终端进程,我们可以在它们连接到ZooKeeper服务端后在ZooKeeper服务端创建一系列对应的瞬时节点,可以用/hostname来进行区分。
这里还是采用监听(Watcher)方式来完成对节点状态的监视,通过对/hostname节点的NodeChildrenChanged事件的监听来完成这一目标。监听进程是作为一个独立的服务或者进程运行的,它覆盖了process方法来实现应急措施。
由于是一个瞬时节点,所以每次客户端断开时znode会立即消失,这样我们就可以监听到集群节点异常。
NodeChildrenChanged事件触发后我们可以调用getChildren方法来知道哪台机器发生了异常。
01
清单1 ClusterMonitor类
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterMonitor implements Runnable{
private static String membershipRoot = "/Members";
private final Watcher connectionWatcher;
private final Watcher childrenWatcher;
private ZooKeeper zk;
boolean alive = true;
public ClusterMonitor(String HostPort) throws IOException,InterruptedException,KeeperException{
connectionWatcher = new Watcher(){
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
if(event.getType() == Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected){
System.out.println(" connectionWatcher Event Received:%s"+event.toString());
}
}
};
childrenWatcher = new Watcher(){
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println(" childrenWatcher Event Received:%s"+event.toString());
if(event.getType()==Event.EventType.NodeChildrenChanged){
try{
//Get current list of child znode and reset the watch
List<String> children = zk.getChildren(membershipRoot, this);
System.out.println("Cluster Membership change,Members: "+children);
}catch(KeeperException ex){
throw new RuntimeException(ex);
}catch(InterruptedException ex){
Thread.currentThread().interrupt();
alive = false;
throw new RuntimeException(ex);
}
}
}
};
zk = new ZooKeeper(HostPort,2000,connectionWatcher);
//Ensure the parent znode exists
if(zk.exists(membershipRoot, false) == null){
zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//Set a watch on the parent znode
List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
System.err.println("Members:"+children);
}
public synchronized void close(){
try{
zk.close();
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
try{
synchronized(this){
while(alive){
wait();
}
}
}catch(InterruptedException ex){
ex.printStackTrace();
Thread.currentThread().interrupt();
}finally{
this.close();
}
}
public static void main(String[] args) throws IOException,InterruptedException,KeeperException{
if(args.length != 1){
System.err.println("Usage:ClusterMonitor<Host:Port>");
System.exit(0);
}
String hostPort = args[0];
new ClusterMonitor(hostPort).run();
}
}
02
清单2 ClusterClient类
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterClient implements Watcher,Runnable{
private static String membershipRoot = "/Members";
ZooKeeper zk;
public ClusterClient(String hostPort,Long pid){
String processId = pid.toString();
try{
zk = new ZooKeeper(hostPort,2000,this);
}catch(IOException ex){
ex.printStackTrace();
}
if(zk!=null){
try{
zk.create(membershipRoot+'/'+processId, processId.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}catch(KeeperException | InterruptedException ex){
ex.printStackTrace();
}
}
}
public synchronized void close(){
try{
zk.close();
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println(" Event Received:%s"+event.toString());
}
@Override
public void run() {
// TODO Auto-generated method stub
try{
synchronized(this){
while(true){
wait();
}
}
}catch(InterruptedException ex){
ex.printStackTrace();
Thread.currentThread().interrupt();
}finally{
this.close();
}
}
public static void main(String[] args){
if(args.length!=1){
System.err.println("Usage:ClusterClient<Host:Port>");
System.exit(0);
}
String hostPort=args[0];
//Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0,index));
new ClusterClient(hostPort,processId).run();
}
}
03
Eclipse运行输出
childrenWatcher Event Received:%sWatchedEvent state:SyncConnected type:NodeChildrenChanged path:/Members
Cluster Membership change,Members: [dweref0000000009, test100000000003, dsdawqeqw0000000008, test111110000000004, test22220000000005, dsda32130000000007, dsda0000000006, test10000000002]
我们通过zkCli方式对被监听的/Members这个ZNODE操作,增加一个子节点,你会在zkCli里看到如清单4所示输出。
04
ZKCli创建ZNode子节点
[zk: localhost:2181(CONNECTED) 0] create -s /Members/dweref rew23rf
Created /Members/dweref0000000009 [zk: localhost:2181(CONNECTED) 4]
上面的示例我们演示了如何发起对于一个ZNODE的监听,当该ZNODE被改变后,我们会触发对应的方法进行处理,这类方式可以被用在数据监听、集群状态监听等用途。
发个小广告!!!走过路过,不要错过
这里有你想买的书!
http://product.dangdang.com/23949549.html#ddclick_reco_reco_relate
麦克叔叔每晚十点说
以上是关于Apache ZooKeeper Watcher机制源码解释的主要内容,如果未能解决你的问题,请参考以下文章
java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher$Event$KeeperState