ZooKeeper操作

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper操作相关的知识,希望对你有一定的参考价值。

ZooKeeper的shell操作
客户端连接
运行 zkCli.sh –server ip 进入命令行工具。

cd /export/server/zookeeper-3.4.6
bin/zkCli.sh  -server node1:2181

shell基本操作
操作命令


操作实例

1:创建普通永久节点

 create /app1 hello

2: 创建序列化的永久节点

create -s /app2 world

3:创建临时节点

create -e /tempnode world

4:创建序列化的临时节点

create -s -e /tempnode2 aaa

5:获取节点数据

   get  /app1

6:修改节点数据

   set /app1  hadoop

7:删除节点

  delete  /app1 删除的节点不能有子节点

  rmr    /app1 递归删除

节点属性
每个znode都包含了一系列的属性,通过命令get,可以获得节点的属性。

dataVersion:数据版本号,每次对节点进行set操作,dataVersion的值都会增加1(即使设置的是相同的数据),可有效避免了数据更新时出现的先后顺序问题。
cversion :子节点的版本号。当znode的子节点有变化时,cversion 的值就会增加1。
cZxid :Znode创建的事务id。
mZxid :Znode被修改的事务id,即每次对znode的修改都会更新mZxid。
对于zk来说,每次的变化都会产生一个唯一的事务id,zxid(ZooKeeper Transaction Id)。通过zxid,可以确定更新操作的先后顺序。例如,如果zxid1小于zxid2,说明zxid1操作先于zxid2发生,zxid对于整个zk都是唯一的,即使操作的是不同的znode。
ctime:节点创建时的时间戳.
mtime:节点最新一次更新发生时的时间戳.
ephemeralOwner:如果该节点为临时节点, ephemeralOwner值表示与该节点绑定的session id. 如果不是临时节点, ephemeralOwner值为0.
在client和server通信之前,首先需要建立连接,该连接称为session。连接建立后,如果发生连接超时、授权失败,或者显式关闭连接,连接便处于CLOSED状态, 此时session结束。

ZooKeeper Watcher(监听机制)
ZooKeeper提供了分布式数据发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使他们能够做出相应的处理。
ZooKeeper中,引入了Watcher机制来实现这种分布式的通知功能。ZooKeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。
触发事件种类很多,如:节点创建,节点删除,节点改变,子节点改变等。
总的来说可以概括Watcher为以下三个过程:客户端向服务端注册Watcher、服务端事件发生触发Watcher、客户端回调Watcher得到触发事件情况

Watch机制特点
一次性触发
事件发生触发监听,一个watcher event就会被发送到设置监听的客户端,这种效果是一次性的,后续再次发生同样的事件,不会再次触发。
事件封装
ZooKeeper使用WatchedEvent对象来封装服务端事件并传递。
WatchedEvent包含了每一个事件的三个基本属性:
通知状态(keeperState),事件类型(EventType)和节点路径(path)
先注册再触发
Zookeeper中的watch机制,必须客户端先去服务端注册监听,这样事件发送才会触发监听,通知给客户端。

通知状态和事件类型
同一个事件类型在不同的通知状态中代表的含义有所不同,下表列举了常见的通知状态和事件类型。
事件封装: Watcher 得到的事件是被封装过的, 包括三个内容 keeperState, eventType, path


其中连接状态事件(type=None, path=null)不需要客户端注册,客户端只要有需要直接处理就行了。

Shell 客户端设置watcher
设置节点数据变动监听:

通过另一个客户端更改节点数据:

此时设置监听的节点收到通知:

ZooKeeper Java API操作
这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端框架 Curator ,解决了很多Zookeeper客户端非常底层的细节开发工作 。
Curator包含了几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):

引入maven坐标

<dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

节点的操作

/*
 创建节点
 */
@Test
public void createZnode() throws Exception {
	//1:定制一个重试策略
	/*
		param1: 重试的间隔时间
		param2:重试的最大次数
	 */
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
	//2:获取一个客户端对象
	/*
	   param1:要连接的Zookeeper服务器列表
	   param2:重试策略
	 */
	String connectionStr = "192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181";
	CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, retryPolicy);

	//3:开启客户端
	client.start();
	//4:创建节点
	/*
	  节点类型:
		   CreateMode.PERSISTENT:永久节点
		   CreateMode.PERSISTENT_SEQUENTIAL:永久序列化节点
		   CreateMode.EPHEMERAL:临时节点
		   CreateMode.EPHEMERAL_SEQUENTIAL:临时序列化节点
	   /hello2 :节点路径
	   world :节点数据
	 */
	client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2","world".getBytes());
	//5:关闭客户端
	client.close();
	}

/*
 修改节点
 */

 @Test
 public void nodeData() throws Exception {

        RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);

        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181",retryPolicy);

        client.start();

        client.setData().forPath("/app1", "hdfs".getBytes());

        client.close();
}


	/**

	 * 数据查询

	 */

	@Test

	public void updateNode() throws Exception {
RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);

		CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181", 3000, 3000, retryPolicy);

		client.start();

		byte[] forPath = client.getData().forPath("/app1");

		System.out.println(new String(forPath));

		client.close();

	}

以上是关于ZooKeeper操作的主要内容,如果未能解决你的问题,请参考以下文章

Java代码操作zookeeper

Zookeeper中节点操作代码

VSCode自定义代码片段——git命令操作一个完整流程

VSCode自定义代码片段15——git命令操作一个完整流程

VSCode自定义代码片段15——git命令操作一个完整流程

在ansible模板中使用动态组名称