Apache ZooKeeper Watcher机制源码解释

Posted 麦克叔叔每晚10点说

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache ZooKeeper Watcher机制源码解释相关的知识,希望对你有一定的参考价值。

分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程。不同节点上的进程互相协调行为的过程叫做分布式同步。许多分布式系统需要一个进程作为任务的协调者,执行一些其他进程并不执行的特殊的操作,一般情况下哪个进程担当任务的协调者都无所谓,但是必须有一个进程作为协调者,自动选举出一个协调者的过程就是分布式选举。ZooKeeper正是为了解决这一系列问题而生的。上一篇我们介绍了ZooKeeper服务启动原理和源代码剖析,这一讲我们来谈谈Watcher机制,首先介绍一个监控示例,然后我们再来聊聊Watcher机制原理。

ZooKeeper Watcher机制

集群状态监控示例

为了确保集群能够正常运行,ZooKeeper可以被用来监视集群状态,这样就可以提供集群高可用性。使用ZooKeeper的瞬时(ephemeral)节点概念可以设计一个集群机器状态检测机制:



  1. 每一个运行了ZooKeeper客户端的生产环境机器都是一个终端进程,我们可以在它们连接到ZooKeeper服务端后在ZooKeeper服务端创建一系列对应的瞬时节点,可以用/hostname来进行区分。



  2. 这里还是采用监听(Watcher)方式来完成对节点状态的监视,通过对/hostname节点的NodeChildrenChanged事件的监听来完成这一目标。监听进程是作为一个独立的服务或者进程运行的,它覆盖了process方法来实现应急措施。


  3. 由于是一个瞬时节点,所以每次客户端断开时znode会立即消失,这样我们就可以监听到集群节点异常。


  4. NodeChildrenChanged事件触发后我们可以调用getChildren方法来知道哪台机器发生了异常。

01

清单1 ClusterMonitor类

import java.io.IOException;

import java.util.List;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

 

public class ClusterMonitor implements Runnable{

       private static String membershipRoot = "/Members";

       private final Watcher connectionWatcher;

       private final Watcher childrenWatcher;

       private ZooKeeper zk;

       boolean alive = true;

       public ClusterMonitor(String HostPort) throws IOException,InterruptedException,KeeperException{

        connectionWatcher = new Watcher(){

           @Override

           public void process(WatchedEvent event) {

               // TODO Auto-generated method stub

               if(event.getType() == Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected){

                   System.out.println(" connectionWatcher Event Received:%s"+event.toString());

               }

           }  

        };

        

        childrenWatcher = new Watcher(){

           @Override

           public void process(WatchedEvent event) {

               // TODO Auto-generated method stub

               System.out.println(" childrenWatcher Event Received:%s"+event.toString());

               if(event.getType()==Event.EventType.NodeChildrenChanged){

                   try{

                       //Get current list of child znode and reset the watch

                       List<String> children = zk.getChildren(membershipRoot, this);

                       System.out.println("Cluster Membership change,Members: "+children);

                   }catch(KeeperException ex){

                       throw new RuntimeException(ex);

                   }catch(InterruptedException ex){

                       Thread.currentThread().interrupt();

                       alive = false;

                       throw new RuntimeException(ex);

                   }

               }

           }

        };

        

        zk = new ZooKeeper(HostPort,2000,connectionWatcher);

        //Ensure the parent znode exists

        if(zk.exists(membershipRoot, false) == null){

            zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        }

        //Set a watch on the parent znode

        List<String> children = zk.getChildren(membershipRoot, childrenWatcher);

        System.err.println("Members:"+children);

       }

      

       public synchronized void close(){

        try{

            zk.close();

        }catch(InterruptedException ex){

            ex.printStackTrace();

        }

       }

      

   @Override

   public void run() {

       // TODO Auto-generated method stub

       try{

           synchronized(this){

               while(alive){

                   wait();

               }

           }

       }catch(InterruptedException ex){

           ex.printStackTrace();

           Thread.currentThread().interrupt();

       }finally{

           this.close();

       }

   }

   public static void main(String[] args) throws IOException,InterruptedException,KeeperException{

       if(args.length != 1){

           System.err.println("Usage:ClusterMonitor<Host:Port>");

           System.exit(0);

       }

       String hostPort = args[0];

       new ClusterMonitor(hostPort).run();

   }

}


02

清单2 ClusterClient类

 import java.io.IOException;

import java.lang.management.ManagementFactory;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

 

public class ClusterClient implements Watcher,Runnable{

    private static String membershipRoot = "/Members";

    ZooKeeper zk;

    public ClusterClient(String hostPort,Long pid){

     String processId = pid.toString();

     try{

         zk = new ZooKeeper(hostPort,2000,this);

     }catch(IOException ex){

         ex.printStackTrace();

     }

     if(zk!=null){

         try{

             zk.create(membershipRoot+'/'+processId, processId.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

         }catch(KeeperException | InterruptedException ex){

             ex.printStackTrace();

         }

     }

    }

   

    public synchronized void close(){

     try{

         zk.close();

     }catch(InterruptedException ex){

         ex.printStackTrace();

     }

    }

   

    @Override

public void process(WatchedEvent event) {

    // TODO Auto-generated method stub

    System.out.println(" Event Received:%s"+event.toString());

}

@Override

public void run() {

    // TODO Auto-generated method stub

    try{

        synchronized(this){

            while(true){

                wait();

            }

        }

    }catch(InterruptedException ex){

        ex.printStackTrace();

        Thread.currentThread().interrupt();

    }finally{

        this.close();

    }

}

 

public static void main(String[] args){

    if(args.length!=1){

        System.err.println("Usage:ClusterClient<Host:Port>");

        System.exit(0);

    }

   

    String hostPort=args[0];

    //Get the process id

    String name = ManagementFactory.getRuntimeMXBean().getName();

    int index = name.indexOf('@');

    Long processId = Long.parseLong(name.substring(0,index));

    new ClusterClient(hostPort,processId).run();

}

03

Eclipse运行输出

childrenWatcher Event Received:%sWatchedEvent state:SyncConnected type:NodeChildrenChanged path:/Members

Cluster Membership change,Members: [dweref0000000009, test100000000003, dsdawqeqw0000000008, test111110000000004, test22220000000005, dsda32130000000007, dsda0000000006, test10000000002]


我们通过zkCli方式对被监听的/Members这个ZNODE操作,增加一个子节点,你会在zkCli里看到如清单4所示输出。

04

ZKCli创建ZNode子节点

[zk: localhost:2181(CONNECTED) 0] create -s /Members/dweref rew23rf

Created /Members/dweref0000000009 [zk: localhost:2181(CONNECTED) 4]


上面的示例我们演示了如何发起对于一个ZNODE的监听,当该ZNODE被改变后,我们会触发对应的方法进行处理,这类方式可以被用在数据监听、集群状态监听等用途。

发个小广告!!!走过路过,不要错过

Apache ZooKeeper Watcher机制源码解释(1)
Apache ZooKeeper Watcher机制源码解释(1)
Apache ZooKeeper Watcher机制源码解释(1)

这里有你想买的书!

http://product.dangdang.com/23949549.html#ddclick_reco_reco_relate

麦克叔叔每晚十点说


以上是关于Apache ZooKeeper Watcher机制源码解释的主要内容,如果未能解决你的问题,请参考以下文章

java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher$Event$KeeperState

zookeeper(10)源码分析-事件监听Watcher

zookeeper--基于watcher原理实现带注册中心的RPC框架

Zookeeper 在Java中的操作

Zookeeper连接eclipse

Java API操作ZooKeeper