ZooKeeper 之Apache Curator 客户端使用

Posted 在奋斗的大道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper 之Apache Curator 客户端使用相关的知识,希望对你有一定的参考价值。

ZooKeeper 原生不足之处:

  • 超时重连,不支持自动,需要手动操作
  • Watch注册一次后会失效
  • 不支持递归创建节点

 Apache Curator 

apache的开源项目,解决watcher注册一次就失效的问题,api更加简单易用,提供更多解决方案并且实现简单:如分布式锁

Maven 添加 Apache Curator 依赖

        <!--zookeeper相关-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>

zk基本操作

zk命名空间以及创建节点,节点的增删改查

import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.254.130:2181";
	/**
	 * 实例化zk客户端
	 */
	public CuratorOperator() {
		/**
		 * curator链接zookeeper的策略:RetryNTimes
		 * n:重试的次数
		 * sleepMsBetweenRetries:每次重试间隔的时间
		 */
		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		client = CuratorFrameworkFactory.builder()
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				//命名空间之后创建的节点都会在workspace工作空间里面
				.namespace("workspace").build();
		client.start();
	}
	/**
	 *
	 * @Description: 关闭zk客户端连接
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 实例化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
		
		// 创建节点
		String nodePath = "/bushro/demo";
		byte[] data = "data".getBytes();
		cto.client.create().creatingParentsIfNeeded()
			.withMode(CreateMode.PERSISTENT)
			.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//默认的权限"world", "anyone"
			.forPath(nodePath, data);
		
		 更新节点数据
		byte[] newData = "newdata".getBytes();
		cto.client.setData().withVersion(0).forPath(nodePath, newData);

		// 删除节点
		cto.client.delete()
				  .guaranteed()					// 如果删除失败,那么在后端还是继续会删除,直到成功
				  .deletingChildrenIfNeeded()	// 如果有子节点,就删除
				  .withVersion(0)
				  .forPath(nodePath);
		
		// 读取节点数据
		Stat stat = new Stat();
		byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
		System.out.println("节点" + nodePath + "的数据为: " + new String(data));
		System.out.println("该节点的版本号为: " + stat.getVersion());
		
		// 查询子节点
		List<String> childNodes = cto.client.getChildren()
											.forPath(nodePath);
		System.out.println("开始打印子节点:");
		for (String s : childNodes) {
			System.out.println(s);
		}
			
		// 判断节点是否存在,如果不存在则为空
		Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
		System.out.println(statExist);
		
		Thread.sleep(100000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
	}
	
	public final static String ADD_PATH = "/super/imooc/d";
	
}

watch与acl的操作

一次注册多次监听

为节点添加watch事件

		// 为节点添加watcher
		//NodeCache: 监听数据节点的变更,会触发事件
		final NodeCache nodeCache = new NodeCache(cto.client, nodePath);
		// buildInitial : 初始化的时候获取node的值并且缓存
		nodeCache.start(true);
		if (nodeCache.getCurrentData() != null) {
			System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
		} else {
			System.out.println("节点初始化数据为空...");
		}
		nodeCache.getListenable().addListener(new NodeCacheListener() {
			public void nodeChanged() throws Exception {
				if (nodeCache.getCurrentData() == null) {
					System.out.println("空");
					return;
				}
				String data = new String(nodeCache.getCurrentData().getData());
				System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
			}
		});

为子节点添加watch事件

		// 为子节点添加watcher
		// PathChildrenCache: 监听数据节点的增删改,会触发事件
		String childNodePathCache =  nodePath;
		// cacheData: 设置缓存节点的数据状态
		final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
		/**
		 * StartMode: 初始化方式
		 * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件(比较好)
		 * NORMAL:异步初始化
		 * BUILD_INITIAL_CACHE:同步初始化
		 */
		childrenCache.start(StartMode.POST_INITIALIZED_EVENT);

		List<ChildData> childDataList = childrenCache.getCurrentData();
		System.out.println("当前数据节点的子节点数据列表:");
		for (ChildData cd : childDataList) {
			String childData = new String(cd.getData());
			System.out.println(childData);
		}

		childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
					System.out.println("子节点初始化ok...");
				}

				else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
					String path = event.getData().getPath();
					System.out.println("添加子节点:" + event.getData().getPath());
					System.out.println("子节点数据:" + new String(event.getData().getData()));

				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
					System.out.println("删除子节点:" + event.getData().getPath());
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
					System.out.println("修改子节点路径:" + event.getData().getPath());
					System.out.println("修改子节点数据:" + new String(event.getData().getData()));
				}
			}
		});

watcher统一配置

例如创建一个存放redis的配置文件节点,里面放入相关操作的json数据如
{“type”:“add”,“url”:“ftp://192.168.254.130/config/redis.xml”,“remark”:“add”}
{“type”:“update”,“url”:“ftp://192.168.254.130/config/redis.xml”,“remark”:“update”}
{“type”:“delete”,“url”:"",“remark”:“delete”}
在程序中我们可以把节点数据转成实体类,根据type的类型来判断进行相应的操作,当zookeeper集群中监听到变化,每台机子就去下载最新的配置文件这样就不用一台一台机子的修改过去,节省时间。
 

public class Client1 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.254.130:2181";

	public Client1() {
		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		client = CuratorFrameworkFactory.builder()
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("workspace").build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
//	public final static String CONFIG_NODE = "/bushro/demo/redis-config";
	public final static String CONFIG_NODE_PATH = "/bushro/demo";
	public final static String SUB_PATH = "/redis-config";
	public static CountDownLatch countDown = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception {
		Client1 cto = new Client1();
		System.out.println("client1 启动成功...");
		
		final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
		childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
		
		// 添加监听事件
		childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				// 监听节点变化
				if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
					String configNodePath = event.getData().getPath();
					if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
						System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
						
						// 读取节点数据
						String jsonConfig = new String(event.getData().getData());
						System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);
						
						// 从json转换配置(转实体类)
						RedisConfig redisConfig = null;
						if (StringUtils.isNotBlank(jsonConfig)) {
							redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
						}
						
						// 配置不为空则进行相应操作
						if (redisConfig != null) {
							String type = redisConfig.getType();
							String url = redisConfig.getUrl();
							String remark = redisConfig.getRemark();
							// 判断事件
							if (type.equals("add")) {
								System.out.println("监听到新增的配置,准备下载...");
								// ... 连接ftp服务器,根据url找到相应的配置
								Thread.sleep(500);
								System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
								// ... 下载配置到你指定的目录
								Thread.sleep(1000);
								System.out.println("下载成功,已经添加到项目中");
								// ... 拷贝文件到项目目录
							} else if (type.equals("update")) {
								System.out.println("监听到更新的配置,准备下载...");
								// ... 连接ftp服务器,根据url找到相应的配置
								Thread.sleep(500);
								System.out.println("开始下载配置文件,下载路径为<" + url + ">");
								// ... 下载配置到你指定的目录
								Thread.sleep(1000);
								System.out.println("下载成功...");
								System.out.println("删除项目中原配置文件...");
								Thread.sleep(100);
								// ... 删除原文件
								System.out.println("拷贝配置文件到项目目录...");
								// ... 拷贝文件到项目目录
							} else if (type.equals("delete")) {
								System.out.println("监听到需要删除配置");
								System.out.println("删除项目中原配置文件...");
							}

						}
					}
				}
			}
		});
		countDown.await();
		cto.closeZKClient();
	}
	
}

acl权限操作与认证授权

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import com.imooc.utils.AclUtils;

public class CuratorAcl {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.254.130:2181";

	public CuratorAcl() {
		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		client = CuratorFrameworkFactory.builder().authorization("digest", "bushro1:123456".getBytes())//使用账号密码认证,可以认证多个用户
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("workspace").build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 实例化
		CuratorAcl cto = new CuratorAcl();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
		
		String nodePath = "/acl/father/child/sub";
		
		List<ACL> acls = new ArrayList<ACL>();
		Id bushro1 = new Id("digest", AclUtils.getDigestUserPwd("bushro1:123456"));
		Id bushro2 = new Id("digest", AclUtils.getDigestUserPwd("bushro2:123456"));
		acls.add(new ACL(Perms.ALL, bushro1));
		acls.add(new ACL(Perms.READ, bushro2));
		acls.add(new ACL(Perms.DELETE | Perms.CREATE, bushro2));
		
		// 创建节点
		byte[] data = "spiderman".getBytes();
		cto.client.create().creatingParentsIfNeeded()//递归方式创建
				.withMode(CreateMode.PERSISTENT)
				.withACL(acls)//该方式只有对最后一个节点设置权限,相应的父节点默认都是"world", "anyone",
				              // 如果后面添加true那么所有创建的节点都是设置的权限
				.forPath(nodePath, data);

		//设置权限
		cto.client.setACL().withACL(acls).forPath("/curatorNode");
		
		// 更新节点数据
		byte[] newData = "batman".getBytes();
		cto.client.setData().withVersion(0).forPath(nodePath, newData);
		
		// 删除节点
		cto.client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(0).forPath(nodePath);
		
		// 读取节点数据
		Stat stat = new Stat();
		byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
		System.out.println("节点" + nodePath + "的数据为: " + new String(data));
		System.out.println("该节点的版本号为: " + stat.getVersion());
		
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
	}
	
}

 

以上是关于ZooKeeper 之Apache Curator 客户端使用的主要内容,如果未能解决你的问题,请参考以下文章

Apache Curator操作zookeeper的API使用

Zookeeper开源客户端Curator之基本功能讲解

使用Apache Curator连接Zookeeper版本问题

ZooKeeper系列—— Java 客户端 Apache Curator

Apache Curator入门实战

Apache Curator ZooKeeper - KeeperErrorCode =未实现,错误