zookeeper 实战操作
Posted myseries
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper 实战操作相关的知识,希望对你有一定的参考价值。
一:监听服务端zookeeper节点数据改变
import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class ConfigApp1 private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws IOException, KeeperException, InterruptedException //连接zookeeper服务器 ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() public void process(WatchedEvent event) if (KeeperState.SyncConnected == event.getState()) //zk连接成功通知事件 if ( EventType.None == event.getType() && null == event.getPath() ) connectedSemaphore.countDown(); System.out.println("==========="); ); connectedSemaphore.await(); //创建节点app1,不进行ACL权限控制,EPHEMERAL:临时节点 zk.create("/app1", "app1Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //注册对该节点的监听 zk.exists("/app1", new WatcherClass(zk)); //延迟2秒后 开始改变数据 TimeUnit.SECONDS.sleep(2); for(int i = 0; i < 10; i++) TimeUnit.SECONDS.sleep(1); String s = ("app" + i * 10); zk.setData("/app1", s.getBytes(), -1); System.out.println("数据改变了:"+s); System.in.read(); static class WatcherClass implements Watcher private ZooKeeper zk; public WatcherClass(ZooKeeper zk) this.zk = zk; @Override public void process(WatchedEvent arg0) try byte[] b = zk.getData("/app1", false, null); System.out.println("改变数据通知:" + new String(b)); //获取数据后,再次对节点进行监听 zk.exists("/app1", new WatcherClass(zk)); catch (KeeperException e) e.printStackTrace(); catch (InterruptedException e) e.printStackTrace();
console结果截图:
二:集群管理
应用集群中,我们常常需要让每一个机器知道集群中(或依赖的其他某一个集群)哪些机器是活着的,并且在集群机器因为宕机,网络断链等原因能够不在人工介入的情况下迅速通知到每一个机器
思路:用三个类模拟成三个服务器,去连接zookeeper,这三个服务器监控zookeeper节点root,每个服务器上线都会在zookeeper的节点root下创建一个临时节点,这样,这三个服务器watcher这个zookeeper的root节点就可以动态感知服务器的上下线情况。
import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.ZooKeeper; public class Cluster1 private static final int zkSessionTimeOut = 5000; private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException //连接zookeeper服务器 ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", zkSessionTimeOut, new Watcher() public void process(WatchedEvent event) if (KeeperState.SyncConnected == event.getState()) //zk连接成功通知事件 if ( EventType.None == event.getType() && null == event.getPath() ) connectedSemaphore.countDown(); System.out.println("==========="); ); connectedSemaphore.await(); Stat stat = zk.exists("/root", true); if(stat == null) System.out.println("/root" + "路径不存在,请先创建该节点"); //创建节点root,不进行ACL权限控制,PERSISTENTAL:永久节点 只有永久节点才可以创建子节点的临时节点 zk.create("/root", "rootDate".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String clusterPath = zk.create("/root/cluster1", "cluster1Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(clusterPath); ZkWatcher zkWatcher = new ZkWatcher(zk); List<String> clusterList = zk.getChildren("/root", zkWatcher); System.out.println("****************"); for(String str : clusterList) System.out.println("cluster:" + str); System.out.println("****************"); while(true)
import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class Cluster2 private static final int zkSessionTimeOut = 5000; private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException //连接zookeeper服务器 ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", zkSessionTimeOut, new Watcher() public void process(WatchedEvent event) if (KeeperState.SyncConnected == event.getState()) //zk连接成功通知事件 if ( EventType.None == event.getType() && null == event.getPath() ) connectedSemaphore.countDown(); System.out.println("==========="); ); connectedSemaphore.await(); Stat stat = zk.exists("/root", true); if(stat == null) System.out.println("/root" + "路径不存在,请先创建该节点"); zk.create("/root", "rootDate".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String clusterPath = zk.create("/root/cluster2", "cluster2Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(clusterPath); ZkWatcher zkWatcher = new ZkWatcher(zk); List<String> clusterList = zk.getChildren("/root", zkWatcher); System.out.println("****************"); for(String str : clusterList) System.out.println("cluster:" + str); System.out.println("****************"); while(true)
import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; public class Cluster3 private static final int zkSessionTimeOut = 5000; private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException //连接zookeeper服务器 ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", zkSessionTimeOut, new Watcher() public void process(WatchedEvent event) if (KeeperState.SyncConnected == event.getState()) //zk连接成功通知事件 if ( EventType.None == event.getType() && null == event.getPath() ) connectedSemaphore.countDown(); System.out.println("==========="); ); connectedSemaphore.await(); Stat stat = zk.exists("/root", true); if(stat == null) System.out.println("/root" + "路径不存在,请先创建该节点"); zk.create("/root", "rootDate".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String clusterPath = zk.create("/root/cluster3", "cluster3Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(clusterPath); ZkWatcher zkWatcher = new ZkWatcher(zk); List<String> clusterList = zk.getChildren("/root", zkWatcher); System.out.println("****************"); for(String str : clusterList) System.out.println("cluster:" + str); System.out.println("****************"); while(true)
import java.util.List; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooKeeper; public class ZkWatcher implements Watcher private ZooKeeper zk; public ZkWatcher(ZooKeeper zk) this.zk = zk; @Override public void process(WatchedEvent event) if(EventType.NodeChildrenChanged.equals(event.getType())) List<String> clusterList = null; try clusterList = zk.getChildren("/root", this); catch (KeeperException e) e.printStackTrace(); catch (InterruptedException e) e.printStackTrace(); System.out.println("****************"); System.out.println("changed"); for(String str : clusterList) System.out.println("cluster:" + str); System.out.println("****************");
以上是关于zookeeper 实战操作的主要内容,如果未能解决你的问题,请参考以下文章
Zookeeper实战分布式安装部署&客户端命令行操作(开发重点)