Zookeeper客户端之 Curator
Posted 熙熙
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper客户端之 Curator相关的知识,希望对你有一定的参考价值。
Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。Curator主要解决了三类问题:
- 封装ZooKeeper client与ZooKeeper server之间的连接处理
- 提供了一套Fluent风格的操作API
- 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装
Curator主要从以下几个方面降低了zk使用的复杂性:
- 重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)
- 连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
- zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性
- 各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况
(一):客户端连接的创建:
curator的操作客户端是:CuratorFramework。其连接的建立方式如下:
@Bean
public CuratorFramework curatorFramework() {
RetryForever forever = new RetryForever(500);
CuratorFramework framework= CuratorFrameworkFactory.builder().connectString(zkUrl)
.connectionTimeoutMs(60000)
.sessionTimeoutMs(120000)
.retryPolicy(forever).build();
framework.start();
return framework;
}
其参数除了连接字符串之外,还有如下是三个参数:
1:连接超时时间:如果配置了curator-default-connection-timeout参数,则取该参数值。 默认值是15秒
2:会话超时时间:如果配置了curator-default-session-timeout参数,则取该参数值。 默认值是60秒
3:重试策略。curator提供了如下重试策略:
3.1:RetryForever。一直重试,参数是重试间隔时间,单位毫秒
3.2:RetryOneTime。重试一次,参数是重试之间的间隔时间,单位毫秒。
3.3:RetryNTimes。重试N次, 参数是重试次数和重试之间的间隔时间,单位毫秒。
3.4:ExponentialBackoffRetry。参数为最大重试次数(默认值29),最大休眠时间,基本休眠时间。其休眠时间不确定。
3.5:BoundedExponentialBackoffRetry。该类继承了ExponentialBackoffRetry。
(二):Watch机制
默认情况下,在操作zookeeper的命令中,使用 usingWatcher() 方法调用的监听器是一次性的。Curator提供了Cache机制,一次注册监听器即可。
Curator提供了如下三种watch。(carator版本不一样,可能会有所不同)
1:NodeCache。提供的Listener是NodeCacheListener,其监听了节点数据的变化。
2:PathChildrenCache。提供的Listener是PathChildrenCacheListener。其监听子节点的变化。
3:TreeCache。提供的Listener是TreeCacheListener。其监听了节点数据和子节点的变化。
使用例子如下:
NodeCache nodeCache = new NodeCache(cutator, "/file/cache");
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("path: /file/cache changed!!!!!!");
}
});
nodeCache.start();
TreeCache treeCache = new TreeCache(cutator, "/file/cache");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("TREE CACHE: " + event.toString());
}
});
treeCache.start();
(三):分布式锁
curator提供了几种分布式锁。其有如下几种:
InterProcessMultiLock
InterProcessMutex
InterProcessReadWriteLock
InterProcessSemaphoreMutex
InterProcessSemaphoreV2。
curator提供的基于zookeeper的分布式锁和redis提供的分布式锁有如下不一样:
1:curator的分布式锁无过期时间,redis的分布式锁一般会设置过期时间。
2:curator的分布式锁在服务停止或者重启后,会释放,如果死锁可以使用这个办法解锁。而redis锁则不行。
3:curator分布式锁会避免羊群效应。
4:curator分布式锁是可重入锁。
curator锁的使用流程大概如下:
1:获取锁时,创建临时顺序节点。
2:判断当前创建的节点是不是首节点,如果是首节点,则认为锁获取成功。
3:如果不是首节点,则对上一个节点添加监听器(watcher),然后当前线程wait。
4:当上一个节点删除时,监听器触发,获取锁成功
5:锁释放的时候,会删除出现的临时顺序节点
// 构造函数,参数是连接客户端和锁路径
public InterProcessMutex(CuratorFramework client, String path)
{
this(client, path, new StandardLockInternalsDriver());
}
// 获取锁
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn\'t necessary */
// 判断是否当前线程获取到锁,如果是当前线程,则返回获取成功
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
lockData.lockCount.incrementAndGet();
return true;
}
// 尝试获取锁,如果获取到,就缓存在内存中。
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try {
// 在该路径下创建临时顺序节点。
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 判断当前创建的节点是否为序号最小的节点,如果不是,则对上一个节点创建监听器watcher,然后等待
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e ) {
// gets thrown by StandardLockInternalsDriver when it can\'t find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
// 如果抛出异常提示节点不存在,则认为是会话超时,则进行重试。
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){
isDone = false;
} else {
throw e;
}
}
}
if ( hasTheLock ){
return ourPath;
}
return null;
}
使用例子如下:
public void run() {
try {
InterProcessMutex lock = new InterProcessMutex(client, path);
lock.acquire(); //获取锁,可以设置超时时间
System.out.println("thread-" + idx + " get the lock!!!");
Random random = new Random();
int time = random.nextInt(10);
TimeUnit.MILLISECONDS.sleep(time * 1000);
System.out.println("thread-" + idx + " release the lock!!!");
lock.release(); //释放锁
} catch (Exception e) {
e.printStackTrace();
}
}
(四):选举用法
zookeeper是提供了2个类用于选择,分别如下:
LeaderLatch:这里提供的选举是同步的
LeaderSelector:这里提供的选举是异步的,提供一个LeaderSelectorListener用于接收选举结果。
用法例子如下:
@Override
public void run() {
LeaderSelector leader = new LeaderSelector(client, path, new LeaderSelectorListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("LeaderSelector Thread-" + idx + " is the leader!!!!!!");
}
});
leader.autoRequeue(); //重新加入抢的序列
leader.start();
}
public void run() {
try {
LeaderLatch leader = new LeaderLatch(client, path);
leader.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println("(Listener)Thread-" + idx + " is the leader!!!!!!");
}
@Override
public void notLeader() {
System.out.println("(Listener)Thread-" + idx + " is not the leader!!!!!!");
}
});
leader.start();
leader.await(); //阻塞直到获取到Leader身份
if (leader.hasLeadership()) {
System.out.println("Thread-" + idx + " is the leader!!!!!!");
} else {
System.out.println("Thread-" + idx + " is not the leader!!!!!!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
以上是关于Zookeeper客户端之 Curator的主要内容,如果未能解决你的问题,请参考以下文章
ZooKeeper 之Apache Curator 客户端使用