ZooKeeper 原生API

Posted 在奋斗的大道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper 原生API相关的知识,希望对你有一定的参考价值。

maven 依赖添加

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

zk客户端与服务器连接

客户端和zk服务端链接是一个异步的过程
当连接成功后后,客户端会收的一个watch通知

/**
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 连接demo演示
 */
public class ZKConnect implements Watcher {
		
	final static Logger log = LoggerFactory.getLogger(ZKConnect.class);

	public static final String zkServerPath = "192.168.254.130:2181";
//	public static final String zkServerPath = "192.168.1.111:2181,192.168.1.111:2182,192.168.1.111:2183";
	public static final Integer timeout = 5000;
	
	public static void main(String[] args) throws Exception {
		/**
		 * 客户端和zk服务端链接是一个异步的过程
		 * 当连接成功后后,客户端会收的一个watch通知
		 *
		 * 参数:
		 * connectString:连接服务器的ip字符串,
		 * 		比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
		 * 		可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
		 * 		也可以在ip后加路径
		 * sessionTimeout:超时时间,心跳收不到了,那就超时
		 * watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
		 * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
		 * 					       此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
		 * sessionId:会话的id
		 * sessionPasswd:会话密码	当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
		 */

		ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnect());
		
		log.warn("客户端开始连接zookeeper服务器...");
		log.warn("连接状态:{}", zk.getState());
		new Thread().sleep(2000);
		log.warn("连接状态:{}", zk.getState());
	}

	@Override
	public void process(WatchedEvent event) {
		log.warn("接受到watch通知:{}", event);
	}
}

zk会话重连机制

获取到连接zk的会话id和会话密码,我们就可以再次回到会话中

/**
 * 
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 恢复之前的会话连接demo演示
 */
public class ZKConnectSessionWatcher implements Watcher {
	
	final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);

	public static final String zkServerPath = "192.168.254.130:2181";
	public static final Integer timeout = 5000;
	
	public static void main(String[] args) throws Exception {
		
		ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher());
		new Thread().sleep(1000);
		long sessionId = zk.getSessionId();
		String ssid = "0x" + Long.toHexString(sessionId);
		System.out.println(ssid);
		byte[] sessionPassword = zk.getSessionPasswd();
		
		log.warn("客户端开始连接zookeeper服务器...");
		log.warn("连接状态:{}", zk.getState());

		log.warn("连接状态:{}", zk.getState());
		
		new Thread().sleep(200);
		
		// 开始会话重连
		log.warn("开始会话重连...");
		
		ZooKeeper zkSession = new ZooKeeper(zkServerPath, 
											timeout, 
											new ZKConnectSessionWatcher(), 
											sessionId, 
											sessionPassword);
		log.warn("重新连接状态zkSession:{}", zkSession.getState());
		new Thread().sleep(1000);
		log.warn("重新连接状态zkSession:{}", zkSession.getState());
	}
	
	@Override
	public void process(WatchedEvent event) {
		log.warn("接受到watch通知:{}", event);
	}
}

java操作zk,节点的创建,删除,设置

/**
 * 
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 操作demo演示
 */
public class ZKNodeOperator implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.254.130:2181";
	public static final Integer timeout = 5000;
	
	public ZKNodeOperator() {}
	
	public ZKNodeOperator(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeOperator());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 
	 * @Title: ZKOperatorDemo.java
	 * @Description: 创建zk节点
	 */
	public void createZKNode(String path, byte[] data, List<ACL> acls) {
		
		String result = "";
		try {
			/**
			 * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
			 * 参数:
			 * path:创建的路径
			 * data:存储的数据的byte[]
			 * acl:控制权限策略
			 * 			Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
			 * 			CREATOR_ALL_ACL --> auth:user:password:cdrwa
			 * createMode:节点类型, 是一个枚举
			 * 			PERSISTENT:持久节点
			 * 			PERSISTENT_SEQUENTIAL:持久顺序节点
			 * 			EPHEMERAL:临时节点
			 * 			EPHEMERAL_SEQUENTIAL:临时顺序节点
			 */
			result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
			
			//异步创建,CreateCallBack是一个回调函数
/*			String ctx = "{'create':'success'}";
			zookeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);*/
			
			System.out.println("创建节点:\\t" + result + "\\t成功...");
			new Thread().sleep(2000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws Exception {
		ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
		
		// 创建zk节点
		zkServer.createZKNode("/testnode", "testnode".getBytes(), Ids.OPEN_ACL_UNSAFE);
		
		/**
		 * 设置节点数据
		 * 参数:
		 * path:节点路径
		 * data:数据
		 * version:数据状态
		 */
//		Stat status  = zkServer.getZookeeper().setData("/testnode", "xyz".getBytes(), 2);
//		System.out.println(status.getVersion());
		
		/**
		 * 同步删除数据,没有返回数据
		 * 参数:
		 * path:节点路径
		 * version:数据状态
		 */
/*		zkServer.createZKNode("/test-delete-node", "123".getBytes(), Ids.OPEN_ACL_UNSAFE);
		zkServer.getZookeeper().delete("/test-delete-node", 2);
		/**
		 * 异步删除数据,在回调函数里面返回
		 * 参数:
		 * path:节点路径
		 * version:数据状态
		 */
		String ctx = "{'delete':'success'}";
		zkServer.getZookeeper().delete("/test-delete-node", 0, new DeleteCallBack(), ctx);
		Thread.sleep(2000);*/
	}

	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}

	@Override
	public void process(WatchedEvent event) {
	}
}

删除回调函数

public class DeleteCallBack implements VoidCallback {
	@Override
	public void processResult(int rc, String path, Object ctx) {
		System.out.println("删除节点" + path);
		System.out.println((String)ctx);
	}
}

获取zk节点数据

使用CountDownLatch来监听

public class ZKGetNodeData implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.254.130:2181";
	public static final Integer timeout = 5000;
	//存放节点数据
	private static Stat stat = new Stat();
	
	public ZKGetNodeData() {}
	
	public ZKGetNodeData(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	private static CountDownLatch countDown = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception {
	
		ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
		
		/**
		 * 参数:
		 * path:节点路径
		 * watch:true或者false,注册一个watch事件
		 * stat:状态
		 */
		byte[] resByte = zkServer.getZookeeper().getData("/bushro", true, stat);
		String result = new String(resByte);
		System.out.println("当前值:" + result);
		countDown.await();
	}
	
	@Override
	public void process(WatchedEvent event) {
		try {
			if(event.getType() == EventType.NodeDataChanged){
				ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
				byte[] resByte = zkServer.getZookeeper().getData("/bushro", false, stat);
				String result = new String(resByte);
				System.out.println("更改后的值:" + result);
				System.out.println("版本号变化dversion:" + stat.getVersion());
				countDown.countDown();
			} else if(event.getType() == EventType.NodeCreated) {
				
			} else if(event.getType() == EventType.NodeChildrenChanged) {
				
			} else if(event.getType() == EventType.NodeDeleted) {
				
			} 
		} catch (KeeperException e) { 
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}
}

获取zk子节点列表

public class ZKGetChildrenList implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.254.130:2181";
	public static final Integer timeout = 5000;
	
	public ZKGetChildrenList() {}
	
	public ZKGetChildrenList(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	private static CountDownLatch countDown = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception {
	
		ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
		
		/**
		 * 参数:
		 * path:父节点路径
		 * watch:true或者false,注册一个watch事件
		 */
//		List<String> strChildList = zkServer.getZookeeper().getChildren("/bushro", true);
//		for (String s : strChildList) {
//			System.out.println(s);
//		}
		
		// 异步调用
		String ctx = "{'callback':'ChildrenCallback'}";
		//回调函数一个是有返回子节点信息(Children2CallBack),一个没有(ChildrenCallBack)
//		zkServer.getZookeeper().getChildren("/imooc", true, new ChildrenCallBack(), ctx);
		zkServer.getZookeeper().getChildren("/bushro", true, new Children2CallBack(), ctx);
		
		countDown.await();
	}
	
	@Override
	public void process(WatchedEvent event) {
		try {
			if(event.getType()==EventType.NodeChildrenChanged){
				System.out.println("NodeChildrenChanged");
				ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
				List<String> strChildList = zkServer.getZookeeper().getChildren(event.getPath(), false);
				for (String s : strChildList) {
					System.out.println("子节点: "+s);
				}
				countDown.countDown();
			} else if(event.getType() == EventType.NodeCreated) {
				System.out.println("NodeCreated");
			} else if(event.getType() == EventType.NodeDataChanged) {
				System.out.println("NodeDataChanged");
			} else if(event.getType() == EventType.NodeDeleted) {
				System.out.println("NodeDeleted");
			} 
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}
	
}

回调函数

public class Children2CallBack implements Children2Callback {

	@Override
	public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
		for (String s : children) {
			System.out.println(s);
		}
		System.out.println("ChildrenCallback:" + path);
		System.out.println((String)ctx);	
		System.out.println("czxid:"+stat.getCzxid());
		System.out.println("ctime:"+stat.getCtime());
		System.out.println("mzxid:"+stat.getMzxid());
		System.out.println("mtime:"+stat.getMtime());
		System.out.println("pzxid:"+stat.getPzxid());
		System.out.println("cversion:"+stat.getCversion());
		System.out.println("version:"+stat.getVersion());
		System.out.println("ephemeralOwner:"+stat.getEphemeralOwner());
		System.out.println("numChildren:"+stat.getNumChildren());
	}
}

删除子节点后触发的事件

判断节点是否存在

public class ZKNodeExist implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.254.130:2181";
	public static final Integer timeout = 5000;
	
	public ZKNodeExist() {}
	
	public ZKNodeExist(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeExist());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	private static CountDownLatch countDown = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception {
	
		ZKNodeExist zkServer = new ZKNodeExist(zkServerPath);
		
		/**
		 * 参数:
		 * path:节点路径
		 * watch:watch
		 */
		Stat stat = zkServer.getZookeeper().exists("/bushro", true);
		if (stat != null) {
			System.out.println("查询的节点版本为dataVersion:" + stat.getVersion());
		} else {
			System.out.println("该节点不存在...");
		}
		
		countDown.await();
	}
	
	@Override
	public void process(WatchedEvent event) {
		if (event.getType() == EventType.NodeCreated) {
			System.out.println("节点创建");
			countDown.countDown();
		} else if (event.getType() == EventType.NodeDataChanged) {
			System.out.println("节点数据改变");
			countDown.countDown();
		} else if (event.getType() == EventType.NodeDeleted) {
			System.out.println("节点删除");
			countDown.countDown();
		}
	}
	
	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}
}

Acl相关操作

public class ZKNodeAcl implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.254.130:2181";
	public static final Integer timeout = 5000;
	
	public ZKNodeAcl() {}
	
	public ZKNodeAcl(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeAcl());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	public void createZKNode(String path, byte[] data, List<ACL> acls) {
		
		String result = "";
		try {
			/**
			 * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
			 * 参数:
			 * path:创建的路径
			 * data:存储的数据的byte[]
			 * acl:控制权限策略
			 * 			Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
			 * 			CREATOR_ALL_ACL --> auth:user:password:cdrwa
			 * createMode:节点类型, 是一个枚举
			 * 			PERSISTENT:持久节点
			 * 			PERSISTENT_SEQUENTIAL:持久顺序节点
			 * 			EPHEMERAL:临时节点
			 * 			EPHEMERAL_SEQUENTIAL:临时顺序节点
			 */
			result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
			System.out.println("创建节点:\\t" + result + "\\t成功...");
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} 
	}
	
	public static void main(String[] args) throws Exception {
	
		ZKNodeAcl zkServer = new ZKNodeAcl(zkServerPath);
		/**
		 * ======================  创建node start  ======================  
		 */
		//acl 任何人都可以访问
		zkServer.createZKNode("/aclbushro", "test".getBytes(), Ids.OPEN_ACL_UNSAFE);
		
		//(测试1)自定义用户认证访问
		List<ACL> acls = new ArrayList<ACL>();
		Id bushro1 = new Id("digest", AclUtils.getDigestUserPwd("bushro1:123456"));
		Id bushro2 = new Id("digest", AclUtils.getDigestUserPwd("bushro2:123456"));
		//设置用户以及对应的权限
		acls.add(new ACL(Perms.ALL, bushro1));
		acls.add(new ACL(Perms.READ, bushro2));
		acls.add(new ACL(Perms.DELETE | Perms.CREATE, bushro2));
		zkServer.createZKNode("/aclbushro/testdigest", "testdigest".getBytes(), acls);
		
		//注册过的用户必须通过addAuthInfo才能操作节点,参考命令行 addauth
		zkServer.getZookeeper().addAuthInfo("digest", "bushro1:123456".getBytes());
		zkServer.createZKNode("/aclbushro/testdigest/childtest", "childtest".getBytes(), Ids.CREATOR_ALL_ACL);
		Stat stat = new Stat();
		byte[] data = zkServer.getZookeeper().getData("/aclbushro/testdigest", false, stat);
		System.out.println(new String(data));
		zkServer.getZookeeper().setData("/aclbushro/testdigest", "now".getBytes(), 1);

		//(测试二)ip方式的acl
		List<ACL> aclsIP = new ArrayList<ACL>();
		Id ipId1 = new Id("ip", "192.168.1.6");
		aclsIP.add(new ACL(Perms.ALL, ipId1));
		zkServer.createZKNode("/aclbushro/iptest6", "iptest".getBytes(), aclsIP);

		//验证ip是否有权限
		zkServer.getZookeeper().setData("/aclbushro/iptest6", "now".getBytes(), 1);
		Stat stat = new Stat();
		byte[] data = zkServer.getZookeeper().getData("/aclbushro/iptest6", false, stat);
		System.out.println(new String(data));
		System.out.println(stat.getVersion());
	}

	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}

	@Override
	public void process(WatchedEvent event) {
		
	}
}

以上是关于ZooKeeper 原生API的主要内容,如果未能解决你的问题,请参考以下文章

(原) 2.1 Zookeeper原生API使用

Zookeeper学习 客户端和原生API

ZooKeeper 原生API

ZooKeeper客户端原生API的使用以及ZkClient第三方API的使用

Zookeeper的原生api操作

06_zookeeper原生Java API使用