Curator简介
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator简介相关的知识,希望对你有一定的参考价值。
参考技术A 一、Curator是NetFlix公司开源的一套Zookeeper客户端框架,其特点如下:1、连接重连
2、反复注册Watcher
3、NodeExistsException处理
二、常用API
API文档对应地址:http://curator.apache.org/curator-framework/index.html
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//Zookeeper每1秒检查一次session状态,如果断掉的话,重新连接,如果连续3次均没连上,则session失效
CuratorFramework client = CuratorFrameworkFactory.newClient(connectingString,sessionTimeout,connectionTimeOut,retryPolicy);
//创建节点
client.create().forPath(path);
//创建节点并设置值
client.create().forPath(path,"init".getBytes());
//创建临时节点
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
//创建临时节点并递归创建父节点
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
//删除节点
client.delete().forPath(path);
//删除节点并递归删除其所有子节点
client.delete().deletingChildrenIfNeeded().forPath(path);
//删除节点,强制指定版本进行删除
client.delete().withVersion(version).forPath(path);
//强制保证删除
client.delete().guaranteed().forPath(path);
//获取某个节点数据
client.getData().forPath(path);
//读取一个节点的数据内容,同时按获取到该节点的stat
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
//更新一个节点的数据内容
client.setData().withVersion(version).forPath(path);
//异步
client.create().creatingParentsIfNeeded().inBackground(new BackgroundCallback()
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception
).forPath(path,"init".getBytes());
三、监听器
1、NodeCache:监听节点变化
NodeCache cache = new NodeCache(client,path,false);
cache.start(true); //第一次启动的时候就会从Zookeeper上同步数据
cache.getListenable().addListener(new NodeCacheListener()
@Override
public void nodeChanged() throws Exception
System.out.println(cache.getCurrentData().getData());
);
2、PathChildrenCache:监控某个节点子节点的变化【无法对父节点以及二级节点数据进行监控】
PathChildrenCache childrenCache = new PathChildrenCache(client,path,true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(new PathChildrenCacheListener()
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception
System.out.println("节点变更类型:"+pathChildrenCacheEvent.getType()+"对应节点:"+pathChildrenCacheEvent.getData().getPath());
);
四、LeaderLatch
LeaderLatch⽤于实现Leader的选举
– 触发方法isLeader(),表示成为leader
– 触发notLeader(),表示失去leader权限
– 场景:参考xxx-callback
public void afterPropertiesSet() throws Exception
if (leaderLatch != null)
return;
leaderLatch = new LeaderLatch(curatorClient,//zk客户端实例
PATH,//选举根节点路径
OSUtils.getServerIp() + "_" + UUID.randomUUID().toString()); //客户端ID,用来标记客户端,即客户端编号,名称
leaderLatch.addListener(new LeaderLatchListener()
@Override
public void isLeader() //抢主成功时触发
ContextHolder.setLeader(true);
LOGGER.info("im leader");
@Override
public void notLeader() //抢主失败时触发
ContextHolder.setLeader(false);
LOGGER.info("im not leader");
);
leaderLatch.start();
五、分布式锁
1、模拟并发
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i=0;i<10;i++)
new Thread(new Runnable()
@Override
public void run()
try
countDownLatch.await();
catch (Exception e)
System.out.println(System.currentTimeMillis());
).start();
countDownLatch.countDown();
2、分布式锁控制
client.start();
String lock_path = "/lock";
final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i=0;i<10;i++)
new Thread(new Runnable()
@Override
public void run()
try
countDownLatch.await();
lock.acquire();
catch (Exception e)
System.out.println(System.currentTimeMillis());
try
lock.release();
catch (Exception e)
e.printStackTrace();
).start();
countDownLatch.countDown();
六、分布式计数器
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client,path,new RetryNTimes(1000,3));
atomicInteger.add(8);
思考:
1、开关同步采用Zookeeper是否可以 ?
2、ABTest?
3、分批任务计算?
4、配置中心
七curator recipes之阻塞队列SimpleDistributedQueue
简介
Java在单机环境实现了BlockQueue阻塞队列,与之类似的curator实现了分布式场景下的阻塞队列,SimpleDistributedQueue
官方文档:http://curator.apache.org/curator-recipes/simple-distributed-queue.html
注意:zookeeper虽然可以实现队列,但是官方并不推荐使用zookeeper来做队列,具体请参考官网
代码示例
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue; import org.apache.curator.retry.ExponentialBackoffRetry; public class SimpleQueueDemo { private static CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2)); private static String path = "/queue/path001"; public static void main(String[] args) throws InterruptedException { client.start(); System.out.println("started"); SimpleDistributedQueue queue = new SimpleDistributedQueue(client, path); new Thread(() -> { try { System.out.println("sleeping"); Thread.sleep(3000); System.out.println("sleep end"); new SimpleDistributedQueue(client, path).offer("lay".getBytes("utf-8")); System.out.println("offered"); } catch (Exception e) { System.out.println("exception"); e.printStackTrace(); } }).start(); System.out.println("polling"); String data = null; try { data = new String(queue.take()); } catch (Exception e) { e.printStackTrace(); } System.out.println("data=" + data); client.close(); } }
输出结果
started
polling
sleeping
sleep end
offered
data=lay
主线程会阻塞直到offer了数据
以上是关于Curator简介的主要内容,如果未能解决你的问题,请参考以下文章
flink hadoop 从0~1分布式计算与大数据项目实战zookeeper内部原理流程简介以及java curator client操作集群注册,读取
SpringBoot基于Zookeeper和Curator生成唯一ID