基于Curator的Zookeeper操作实战
Posted 外星喵
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Curator的Zookeeper操作实战相关的知识,希望对你有一定的参考价值。
前言
Zookeeper操作方式
这篇文章主要说的是利用java来操作zookeeper,就如操作mysql数据库一样,主要是实现增删改查功能,而实现这些功能的方式主要有以下三种:
- zookeeper官方提供的原生的api
- zkclient
- Apache Curator
简单说下三种方式的区别与各自的优劣:
- zookeeper自带的客户端是官方提供的,比较底层、使用起来写代码麻烦,很多功能需要自己来实现、不够直接。
- zkclient是另一个开源的ZooKeeper客户端。
- Apache Curator是Apache的开源项目,封装了zookeeper自带的客户端,使用相对简便,易于使用。
Curator介绍
这篇主要讲解使用基于Curator操作Zookeeper的操作案例。
Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。通过查看官方文档,可以发现Curator主要解决了三类问题:
- 封装ZooKeeper client与ZooKeeper server之间的连接处理
- 提供了一套Fluent风格的操作API
- 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装
基于Curator使用Zookeeper
导入依赖
<!-- ZooKeeper 之 Curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
Zookeeper客户端
创建客户端
使用CuratorFrameworkFactory的两个静态工厂方法来创建zookeeper客户端对象,主要需要设置以下参数:
- connectString:zookeeper服务器地址及端口号,多个zookeeper服务器地址以“,”分隔。
- sessionTimeoutMs:会话超时时间,单位毫秒,默认为60000ms。
- connectionTimeoutMs:连接超时时间,单位毫秒,默认为15000ms。
- namespace: 为了实现不同的Zookeeper业务之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个Zookeeper根路径
- retryPolicy:重试连接策略,有四种实现,分别为:
- ExponentialBackoffRetry(重试指定的次数, 且每一次重试之间停顿的时间逐渐增加)
- RetryNtimes(指定最大重试次数的重试策略)
- RetryOneTimes(仅重试一次)、
- RetryUntilElapsed(一直重试直到达到规定的时间)
//Zookeeper客户端
private CuratorFramework client;
@Before
public void testConnect() {
client = CuratorFrameworkFactory.builder()
.connectString("192.168.124.18:2181") //连接地址和端口号
.sessionTimeoutMs(10000) //会话超时时间
.connectionTimeoutMs(1000) // 连接超时时间
.namespace("/test") //名称空间
.retryPolicy(new ExponentialBackoffRetry(1000, 10)) //重试策略
.build();
client.start();//开启连接
}
关闭客户端
@After
public void testClose() {
if (client != null) {
client.close();
}
}
节点操作
创建节点
主要有以下四种:
- 创建普通节点(默认即是持久化节点)
- 创建多集节点
- 创建指定节点类型的节点:
- PERSISTENT:持久化节点
- PERSISTENT_SEQUENTIAL:持久化且带序列号节点
- EPHEMERAL:临时节点
- EPHEMERAL_SEQUENTIAL:临时且带序列号节点
- 创建带指定数据的节点
@Test
public void testCreateNode() throws Exception {
//1.普通创建
client.create().forPath("/test1");
//2.创建多集节点
client.create().creatingParentContainersIfNeeded().forPath("/test2/demo");
//3.设置创建节点类型
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3");
//4.创建带指定数据的节点
client.create().forPath("/test4", "This is test4".getBytes(StandardCharsets.UTF_8));
}
更新节点
@Test
public void testUpdateNode() throws Exception {
//更新一个节点的数据内容
client.setData().forPath("/test4", "第一次更新".getBytes());
//更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(1).forPath("/test4", "第二次更新".getBytes());
}
查询节点
@Test
public void testQueryNode() throws Exception {
Stat stat1 = client.checkExists().forPath("/test1");
System.out.println("路径节点/test1是否存在:" + (stat1 != null));
byte[] bytes = client.getData().forPath("/test2");
System.out.println("路径/test2的数据是:" + new String(bytes));
Stat stat2 = new Stat();
bytes = client.getData().storingStatIn(stat2).forPath("/test4");
System.out.println("路径/test4的数据是:" + new String(bytes));
System.out.println("路径/test4的数据的版本是:" + stat2.getVersion());
}
删除节点
@Test
public void testDeleteNode() throws Exception {
//只能删除叶子节点
client.delete().forPath("/test1");
//删除一个节点,并递归删除其所有子节点
client.delete().deletingChildrenIfNeeded().forPath("/test2");
//强制指定版本进行删除
client.delete().withVersion(1).forPath("/test4");
//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed()
// 如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
client.delete().guaranteed().forPath("/test3");
}
异步接口
上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型 | 对应CuratorFramework实例的方法 |
---|---|
CREATE | create() |
DELETE | delete() |
EXISTS | checkExists() |
GET_DATA | getData() |
SET_DATA | setData() |
CHILDREN | getChildren() |
SYNC | sync(String,Object) |
GET_ACL | getACL() |
SET_ACL | setACL() |
WATCHED | Watcher(Watcher) |
CLOSING | close() |
响应码(getResultCode())
响应码 | 意义 |
---|---|
0 | OK,即调用成功 |
-4 | ConnectionLoss,即客户端与服务端断开连接 |
-110 | NodeExists,即节点已经存在 |
-112 | SessionExpired,即会话过期 |
异步创建节点
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");
监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
-
Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
-
Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
-
Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
Node Cache
@Test
public void testNodeCache() throws Exception {
//最后一个参数表示是否进行压缩
NodeCache cache = new NodeCache(client, "/super", false);
cache.start(true);
//只会监听节点的创建和修改,删除不会监听
cache.getListenable().addListener(() -> {
System.out.println("路径:" + cache.getCurrentData().getPath());
System.out.println("数据:" + new String(cache.getCurrentData().getData()));
System.out.println("状态:" + cache.getCurrentData().getStat());
});
client.create().forPath("/nodeCache", "1234".getBytes());
Thread.sleep(1000);
client.setData().forPath("/nodeCache", "5678".getBytes());
Thread.sleep(1000);
client.delete().forPath("/nodeCache");
Thread.sleep(5000);
}
Path Cache
@Test
public void testPathChildrenCache() throws Exception {
//第三个参数表示是否接收节点数据内容
PathChildrenCache childrenCache = new PathChildrenCache(client, "/super", true);
/**
* 如果不填写这个参数,则无法监听到子节点的数据更新
如果参数为PathChildrenCache.StartMode.BUILD_INITIAL_CACHE,则会预先创建之前指定的/super节点
如果参数为PathChildrenCache.StartMode.POST_INITIALIZED_EVENT,效果与BUILD_INITIAL_CACHE相同,只是不会预先创建/super节点
参数为PathChildrenCache.StartMode.NORMAL时,与不填写参数是同样的效果,不会监听子节点的数据更新操作
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener((framework, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
break;
default:
break;
}
});
client.create().forPath("/pathChildrenCache", "123".getBytes());
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes());
//经测试,不会监听到本节点的数据变更,只会监听到指定节点下子节点数据的变更
client.setData().forPath("/pathChildrenCache", "456".getBytes());
client.setData().forPath("/pathChildrenCache/c1", "c1新内容".getBytes());
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
Thread.sleep(5000);
}
Tree Cache
@Test
public void testTreeCache() throws Exception {
TreeCache treeCache = new TreeCache(client, "/treeCache");
treeCache.start();
treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
switch (treeCacheEvent.getType()) {
case NODE_ADDED:
System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
+ ",状态:" + treeCacheEvent.getData().getStat());
break;
case NODE_UPDATED:
System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
+ ",状态:" + treeCacheEvent.getData().getStat());
break;
case NODE_REMOVED:
System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
+ ",状态:" + treeCacheEvent.getData().getStat());
break;
default:
break;
}
});
client.create().forPath("/treeCache", "123".getBytes());
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());
client.setData().forPath("/treeCache", "789".getBytes());
client.setData().forPath("/treeCache/c1", "910".getBytes());
client.delete().forPath("/treeCache/c1");
client.delete().forPath("/treeCache");
Thread.sleep(5000);
}
事务管理
碰到异常,事务会回滚
@Test
public void testTransaction() throws Exception{
//定义几个基本操作
CuratorOp createOp = client.transactionOp().create()
.forPath("/curator/one_path","some data".getBytes());
CuratorOp setDataOp = client.transactionOp().setData()
.forPath("/curator","other data".getBytes());
CuratorOp deleteOp = client.transactionOp().delete()
.forPath("/curator");
//事务执行结果
List<CuratorTransactionResult> results = client.transaction()
.forOperations(createOp,setDataOp,deleteOp);
//遍历输出结果
//因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚
for(CuratorTransactionResult result : results){
System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());
}
}
Leader选举
在分布式计算中, leader elections是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader(领导者)或者coordinator(协调者). 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader. 除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。
在zookeeper集群中,leader负责写操作,然后通过Zab协议实现follower的同步,leader或者follower都可以处理读操作。
Curator 有两种leader选举的recipe,分别是
- LeaderSelector:所有存活的客户端不间断的轮流做Leader
- LeaderLatch:一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权
public class CuratorLeaderTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws InterruptedException {
LeaderSelectorListener listener = new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(Thread.currentThread().getName() + " take leadership!");
// takeLeadership() method should only return when leadership is being relinquished.
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
}
};
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
Thread.sleep(Integer.MAX_VALUE);
}
private static void registerListener(LeaderSelectorListener listener) {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
// 2.Ensure path
try {
new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
} catch (Exception e) {
e.printStackTrace();
}
// 3.Register listener
LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
selector.autoRequeue();
selector.start();
}
}
分布式锁
分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。
具体实现请参考我的上一篇文章:Zookeeper实现分布式锁
以上是关于基于Curator的Zookeeper操作实战的主要内容,如果未能解决你的问题,请参考以下文章
flink hadoop 从0~1分布式计算与大数据项目实战zookeeper内部原理流程简介以及java curator client操作集群注册,读取
Zookeeper客户端zkClient和curator的操作