zookeeper学习
Posted 红豆和绿豆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper学习相关的知识,希望对你有一定的参考价值。
下载安装单机zookeeper的server
zookeeper的操作
zookeeper的数据结构
服务端常用命令
客户端常用命令
临时节点,客户端关闭,服务端创建的节点则会删除
不能重复创建节点
不能删除带子节点的ZNode
javaAPI操作Zookeeper的节点操作
curator的增删改查连接基本操作
package curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class CuratorTest {
private CuratorFramework curatorFramework;
/**
* 建立连接
*/
@Before
public void testConnect(){
//第一种方式
/**
* String connectString, 连接字符串 zkServer的地址和端口
* int sessionTimeoutMs, 会话超时时间 单位为毫秒 跟zkServer会话
* int connectionTimeoutMs, 建立连接的超时时间
* RetryPolicy retryPolicy 重试策略 客户端连接失败的策略
*
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
curatorFramework = CuratorFrameworkFactory.newClient("127.0.0.1:2181,27.0.0.2:2181", 60 * 1000, 15 * 1000, retryPolicy);
//开启客户端
curatorFramework.start();
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,27.0.0.2:2181").
connectionTimeoutMs(15 * 1000).sessionTimeoutMs(60 * 1000).namespace("heihi")//默认是一个根节点
.retryPolicy(retryPolicy).build();
client.start();
}
//------------------------------------创建--------------------
/**
* 创建一个ZNode节点
* create 节点名字 类型(永久,临时,顺序) 内容
*
* 基本创建
* 设置节点内容
* 设置节点类型
* 创建多级节点
*/
@Test
public void testCreate() throws Exception {
/**
* 默认创建的目录的父目录都是heihi 如果不放任何数据,会默认放一个客户端的ip地址
*/
String forPath = curatorFramework.create().forPath("/app1");
System.out.println(forPath);
}
@Test
public void testCreate2() throws Exception {
String forPath = curatorFramework.create().forPath("/app2","data".getBytes());
System.out.println(forPath);
}
/**
* 创建节点的模式
* @throws Exception
*/
@Test
public void testCreate3() throws Exception {
/**
* 默认类型为持久化的节点 withMode
* CreateMode 使用这个节点进行设置
*/
String forPath = curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","data".getBytes());
System.out.println(forPath);
}
/**
* 创建多级节点
* @throws Exception
*/
@Test
public void testCreate4() throws Exception {
/**
* 默认类型为持久化的节点 withMode
* CreateMode 使用这个节点进行设置
* creatingParentsIfNeeded 允许父节点不存在创建父节点
*/
String forPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/app4/p1","data".getBytes());
System.out.println(forPath);
}
//------------------------------------查询---------------------------
/**
* 查询节点数据getData().forPath
* 查询子节点 curatorFramework.getChildren().forPath
* 查询节点的状态信息 ls -sgetData().storingStatIn(stat).forPath
*
* @throws Exception
*/
@Test
public void testGet() throws Exception {
//查询/app1 这个节点的数据
byte[] bytes = curatorFramework.getData().forPath("/app1");
System.out.println(new String(bytes));
}
/**
* 查询子节点
* @throws Exception
*/
@Test
public void testGet2() throws Exception {
//查询一个节点的所有子节点
List<String> list = curatorFramework.getChildren().forPath("/app4");
System.out.println(list);
}
/**
* 查询节点的状态
* @throws Exception
*/
@Test
public void testGet3() throws Exception {
Stat stat=new Stat();
System.out.println(stat);
byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
}
//------------------------------------修改---------------------------
/**
* 修改节点数据 setData().forPath
* @throws Exception
*/
@Test
public void testSet() throws Exception {
curatorFramework.setData().forPath("/app1","heihei".getBytes());
}
/**
* 根据版本修改 setData().withVersion
* @throws Exception
*/
@Test
public void testSetForVersion() throws Exception {
Stat stat=new Stat();
byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/app1");
int version=stat.getVersion();
//解决并发的问题 保证其它客户端不要干扰当前客户端
curatorFramework.setData().withVersion(version).forPath("/app1","heihei".getBytes());
}
//------------------------------------删除---------------------------
/**
* 删除单个节点
* delete().forPath
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
curatorFramework.delete().forPath("/app1");
}
/**
* 删除带有子节点的节点
* delete().deletingChildrenIfNeeded().forPath
* @throws Exception
*/
@Test
public void testDelete2() throws Exception {
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/app4");
}
/**
* 必须成功删除节点 防止网络抖动
* delete().guaranteed().forPath
* @throws Exception
*/
@Test
public void testDelete3() throws Exception {
curatorFramework.delete().guaranteed().forPath("/app2");
}
/**
* 删除回调
* delete().guaranteed().inBackground
* @throws Exception
*/
@Test
public void testDelete4() throws Exception {
//删除回调
curatorFramework.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被删除了client:"+client+"--"+event);
}
}).forPath("/app1");
}
@After
public void testClose(){
if(curatorFramework!=null){
curatorFramework.close();
}
}
}
curator的watch的监控
具体的代码实现
package curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
public class CuratorWatchTest {
private CuratorFramework curatorFramework;
/**
* 建立连接
*/
@Before
public void testConnect(){
//第一种方式
/**
* String connectString, 连接字符串 zkServer的地址和端口
* int sessionTimeoutMs, 会话超时时间 单位为毫秒 跟zkServer会话
* int connectionTimeoutMs, 建立连接的超时时间
* RetryPolicy retryPolicy 重试策略 客户端连接失败的策略
*
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
curatorFramework = CuratorFrameworkFactory.newClient("127.0.0.1:2181,27.0.0.2:2181", 60 * 1000, 15 * 1000, retryPolicy);
//开启客户端
curatorFramework.start();
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,27.0.0.2:2181").
connectionTimeoutMs(15 * 1000).sessionTimeoutMs(60 * 1000).namespace("heihi")//默认是一个根节点
.retryPolicy(retryPolicy).build();
client.start();
}
/**
* 监控单个节点变化
*/
@Before
public void testNodeCache() throws Exception {
//创建NodeCache
/**
* @param client curztor client
* @param path the full path to the node to cache
* @param dataIsCompressed if true, data in the path is compressed
*/
NodeCache nodeCache=new NodeCache(curatorFramework,"/app1");
//注册监控
/**
* 增删改都会回调
*/
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println(" nodeChanged");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//开始监听
/**
* 如果设置为true 开启监听 加载缓冲数据
*/
nodeCache.start(true);
}
/**
* 监控单个节点子节点变化
*/
@Before
public void testPathChildernCache() throws Exception {
//创建PathChildrenCache
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService Closeable ExecutorService to use for the PathChildrenCache's background thread. This service should be single threaded, otherwise the cache may see inconsistent results.
*/
PathChildrenCache pathChildrenCache=new PathChildrenCache(curatorFramework,"/app2",true);
//注册监控
/**
* 增删改都会回调
*/
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
/**
* /**
* * A child was added to the path
* *
* CHILD_ADDED,
*
* /**
* * A child's data was changed
* *
*CHILD_UPDATED,
*
* /**
* * A child was removed from the path
* *
*CHILD_REMOVED,
* @param client
* @param event
* @throws Exception
*/
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("children nodeChanged");
//监听子节点数据变更才进行处理
if(event.getType()== PathChildrenCacheEvent.Type.CHILD_UPDATED){
System.out.println(event.getData().getData());
}
}
});
//开始监听
/**
* 如果设置为true 开启监听 加载缓冲数据
*/
pathChildrenCache.start();
}
/**
* 监控某个节点和所有 子节点
*/
@Before
public void testTreeCache() throws Exception {
//创建TreeCache
/**
* @param client curztor client
* @param path the full path to the node to cache
* @param dataIsCompressed if true, data in the path is compressed
*/
TreeCache treeCache=new TreeCache(curatorFramework,"/app1");
//注册监控
/**
* 增删改都会回调
*/
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("nodechange");
System.out.println("client:"+client+"event"+event);
}
});
//开始监听
/**
* 如果设置为true 开启监听 加载缓冲数据
*/
treeCache.start();
}
@After
public void testClose(){
if(curatorFramework!=null){
curatorFramework.close();
}
}
}
zookeeper实现分布式锁
基于redis的分布式锁:https://blog.csdn.net/u011955252/article/details/114174257
zookeeper的分布式锁的原理
package curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Ticket12306 implements Runnable{
private int tickets=10;
private InterProcessMutex lock;
public Ticket12306(){
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
CuratorFramework curatorFramework = CuratorFrameworkFactory.
newClient("127.0.0.1:2181,27.0.0.2:2181", 60 * 1000, 15 * 1000, retryPolicy);
//开启客户端
curatorFramework.start();
lock=new InterProcessMutex(curatorFramework,"/lock");
}
@Override
public void run() {
while (true){
//加锁
try{
lock.acquire();
if(tickets>0){
System.out.println(Thread.currentThread().getName()+" 买票了"+tickets);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package curator;
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306=new Ticket12306();
new Thread(ticket12306,"肥猪").start();
new Thread(ticket12306,"携程").start();
}
}
zookeeper 集群搭建
以上是关于zookeeper学习的主要内容,如果未能解决你的问题,请参考以下文章