Zookeeper客户端Curator使用详解
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper客户端Curator使用详解相关的知识,希望对你有一定的参考价值。
参考技术A[TOC]
维护多个博客比较麻烦,和博客园放弃维护,后续在个人博客持续更新:
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的由来是比较有趣的,下面的片段摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与实践》一书:
Zookeeper最早起源于雅虎的研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很多大型的系统需要依赖一个类似的系统进行分布式协调,但是这些系统往往存在分布式单点问题。所以雅虎的开发人员就试图开发一个通用的无单点问题的分布式协调框架。在立项初期,考虑到很多项目都是用动物的名字来命名的(例如著名的Pig项目),雅虎的工程师希望给这个项目也取一个动物的名字。时任研究院的首席科学家Raghu Ramakrishnan开玩笑说:再这样下去,我们这儿就变成动物园了。此话一出,大家纷纷表示就叫动物园管理员吧——因为各个以动物命名的分布式组件放在一起,雅虎的整个分布式系统看上去就像一个大型的动物园了,而Zookeeper正好用来进行分布式环境的协调——于是,Zookeeper的名字由此诞生了。
Curator无疑是Zookeeper客户端中的瑞士军刀,它译作"馆长"或者\'\'管理者\'\',不知道是不是开发小组有意而为之,笔者猜测有可能这样命名的原因是说明Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包含了几个包:
curator-framework: 对zookeeper的底层api的一些封装
curator-client: 提供一些客户端的操作,例如重试策略等
curator-recipes: 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x, 如果跨版本会有兼容性问题,很有可能导致节点操作失败 ):
一个例子如下:
newClient静态工厂方法包含四个主要参数:
核心参数变为流式设置,一个列子如下:
为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间( NameSpace ),即指定一个Zookeeper的根路径(官方术语: 为Zookeeper添加“Chroot”特性 )。例如(下面的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。
当创建会话成功,得到client的实例然后可以直接调用其start( )方法:
Zookeeper的节点创建模式:
**创建一个节点,初始内容为空 **
注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空
创建一个节点,附带初始化内容
创建一个节点,指定创建模式(临时节点),内容为空
创建一个节点,指定创建模式(临时节点),附带初始化内容
创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点。
删除一个节点
注意,此方法只能删除 叶子节点 ,否则会抛出异常。
删除一个节点,并且递归删除其所有的子节点
删除一个节点,强制指定版本进行删除
删除一个节点,强制保证删除
guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
注意: 上面的多个流式接口是可以自由组合的,例如:
读取一个节点的数据内容
注意,此方法返的返回值是byte[ ];
读取一个节点的数据内容,同时获取到该节点的stat
更新一个节点的数据内容
注意:该接口会返回一个Stat实例
更新一个节点的数据内容,强制指定版本进行更新
注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath( )指定要操作的ZNode
注意:该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:
上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。 BackgroundCallback 接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
响应码(#getResultCode())
一个异步创建节点的例子如下:
注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
提醒:首先你必须添加curator-recipes依赖,下文仅仅对recipes一些特性的使用进行解释和举例,不打算进行源码级别的探讨
重要提醒:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下的所有Api将会失效或者过期,尽管后面所有的例子都没有使用到ConnectionStateListener。
Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。
实际使用时会涉及到四个类:
通过下面的构造函数创建Path Cache:
想使用cache,必须调用它的 start 方法,使用完后调用 close 方法。 可以设置StartMode来实现启动的模式
......to be continue
ZooKeeper客户端Curator使用
Curator是Netflix公司开源的一套zookeeper客户端框架.解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Curator被看做是zookeeper客户端框里面的瑞士军刀(牛逼了)。Curator使得我们开发zookeeper客户端程序变的很容易。
Curator框架包含三个主要的包:
- curator-framework:对zookeeper的底层api的一些封装。
- curator-client:提供一些客户端的操作,例如重试策略等。
- curator-recipes:封装了一些高级特性,例如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
Curator的引入(pom方式,版本可能有变化)。
<!-- zookeeper -->
<!-- 对zookeeper的底层api的一些封装 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<!-- 提供一些客户端的操作,例如重试策略等 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<!--排除这个slf4j-log4j12-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
一 Curator的基本用法
1.1 创建zookeeper客户端
在Curator中CuratorFramework对象就代表一个zookeeper客户端。所以创建创建zookeeper客户端就是创建CuratorFramework对象。CuratorFramework对象又可以通过CuratorFrameworkFactory来创建。
CuratorFramework api介绍如下
public interface CuratorFramework
/**
* 启动zookeeper客户端
*/
public void start();
/**
* 关闭zookeeper客户端
*/
public void close();
/**
* 返回客户端状态:LATENT、STARTED、STOPPED
*/
public CuratorFrameworkState getState();
/**
* 客户端是否已经启动
*/
@Deprecated
public boolean isStarted();
/**
* 创建节点的建造器
*/
public CreateBuilder create();
/**
* 删除节点的建造器
*/
public DeleteBuilder delete();
/**
* 检查节点是否存在的建造器
*/
public ExistsBuilder checkExists();
/**
* 获取接连数据的建造器
*/
public GetDataBuilder getData();
/**
* 设置节点数据的建造器
*/
public SetDataBuilder setData();
/**
* 获取子节点的建造器
*/
public GetChildrenBuilder getChildren();
/**
* 获取权限的构造器
*/
public GetACLBuilder getACL();
/**
* 设置权限的构造器
*/
public SetACLBuilder setACL();
/**
* 重新配置的建造器
*/
public ReconfigBuilder reconfig();
/**
* 获取配置的建造器
*/
public GetConfigBuilder getConfig();
/**
* 事务构造器
* @deprecated use @link #transaction() instead
*/
public CuratorTransaction inTransaction();
/**
* 事务构造器
*/
public CuratorMultiTransaction transaction();
/**
* 分配可与transaction()一起使用的操作
*/
public TransactionOp transactionOp();
/**
* 如果路径不存在,则创建路径对应的节点
*/
public void createContainers(String path) throws Exception;
/**
* 启动同步构建器。注意:即使您不使用其中一种background()方法,同步也始终在后台
*/
public SyncBuilder sync();
/**
* 启动remove watch builder,有节点删除的时候会调用
*/
public RemoveWatchesBuilder watches();
/**
*
* 返回Connect State的可侦听接口
*/
public Listenable<ConnectionStateListener> getConnectionStateListenable();
/**
* 返回事件的可侦听接口
*/
public Listenable<CuratorListener> getCuratorListenable();
/**
* 返回未处理错误的可侦听接口
*/
public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
/**
* 返回一个新的CuratorFramework,该CuratorFramework指定了一个新的命名空间
*/
public CuratorFramework usingNamespace(String newNamespace);
/**
* 获取命名空间
*/
public String getNamespace();
/**
* 返回托管的zookeeper客户端
*/
public CuratorZookeeperClient getZookeeperClient();
/**
* 阻塞,直到与ZooKeeper的连接可用或已超过maxWaitTime
*/
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
/**
* 阻塞,直到与ZooKeeper的连接可用。在连接可用或中断之前,此方法不会返回,在这种情况下,将抛出InterruptedException
*/
public void blockUntilConnected() throws InterruptedException;
/**
* 返回跟踪观察者创建的当前实例的外观,并允许一次性删除所有观察者
*/
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();
/**
* 返回配置的错误策略
*/
public ConnectionStateErrorPolicy getConnectionStateErrorPolicy();
/**
*
* Current维护Zookeeper仲裁配置的缓存视图。
*/
public QuorumVerifier getCurrentConfig();
/**
* 获取SchemaSet
*/
SchemaSet getSchemaSet();
/**
* 如果此实例在ZK 3.4.x兼容模式下运行,则返回true
*/
boolean isZk34CompatibilityMode();
CuratorFrameworkFactory api介绍如下
public class CuratorFrameworkFactory
/**
* 用于通过建造者模式创建zookeeper客户端
*/
public static Builder builder();
/**
* 创建zookeeper客户端
*/
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);
/**
* 创建zookeeper客户端
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);
/**
* 将本地地址作为可用作节点有效负载的字节返回
*/
public static byte[] getLocalAddress();
public static class Builder
/**
* build CuratorFramework对象 -- zookeeper客户端
*/
public CuratorFramework build();
/**
* 创建一个临时的CuratorFramework客户端,CuratorFramework,默认3分钟不活动客户端连接就被关闭
*/
public CuratorTempFramework buildTemp();
/**
* 创建一个临时的CuratorFramework客户端,CuratorFramework,可以自己指定多长时间不活动客户端连接就被关闭
*/
public CuratorTempFramework buildTemp(long inactiveThreshold, TimeUnit unit);
/**
* 添加zookeeper 访问权限
*/
public Builder authorization(String scheme, byte[] auth);
public Builder authorization(List<AuthInfo> authInfos);
/**
* 设置zookeeper服务器列表
*/
public Builder connectString(String connectString);
/**
* zookeeper服务器地址通过EnsembleProvider(配置提供者)来提供,不能和connectString共同使用
*/
public Builder ensembleProvider(EnsembleProvider ensembleProvider);
/**
* 为每次新建的节点设置一个默认值
*/
public Builder defaultData(byte[] defaultData);
/**
* 设置命名空间,为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间
*/
public Builder namespace(String namespace)
/**
* 会话超时时间,单位毫秒,默认60000ms
*/
public Builder sessionTimeoutMs(int sessionTimeoutMs);
/**
* 连接创建超时时间,单位毫秒,默认60000ms
*/
public Builder connectionTimeoutMs(int connectionTimeoutMs);
/**
* @param maxCloseWaitMs time to wait during close to join background threads
* @return this
*/
public Builder maxCloseWaitMs(int maxCloseWaitMs);
/**
* 设置客户端重连策略
*/
public Builder retryPolicy(RetryPolicy retryPolicy);
/**
* Executor Services的线程工厂
*/
public Builder threadFactory(ThreadFactory threadFactory);
/**
* 压缩器,用于压缩和解压数据
*/
public Builder compressionProvider(CompressionProvider compressionProvider);
/**
* ZookeeperFactory 用于创建ZooKeeper
*/
public Builder zookeeperFactory(ZookeeperFactory zookeeperFactory);
/**
* 权限控制器
*/
public Builder aclProvider(ACLProvider aclProvider);
/**
* 设置只读模式
*/
public Builder canBeReadOnly(boolean canBeReadOnly);
/**
* 不让客户端,创建节点的时候顺带创建父节点
*/
public Builder dontUseContainerParents();
/**
* 默认是StandardConnectionStateErrorPolicy,设置要使用的错误策略
*/
public Builder connectionStateErrorPolicy(ConnectionStateErrorPolicy connectionStateErrorPolicy);
/**
* 如果mode为true,则创建ZooKeeper 3.4.x兼容客户端。如果使用的客户端库是ZooKeeper 3.4.x 默认情况下已启用
*/
public Builder zk34CompatibilityMode(boolean mode);
/**
* 更改连接处理策略,默认StandardConnectionHandlingPolicy
*/
public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy);
/**
* 添加强制架构集
*/
public Builder schemaSet(SchemaSet schemaSet);
从上面的CuratorFrameworkFactory api的介绍可以看出CuratorFrameworkFactory对象的创建有两种方式:
- 通过过构造函数创建
参数 | 类型 | 含义 |
---|---|---|
connectString | String | 服务器列表,格式host1:port1,host2:port2,… |
sessionTimeoutMs | int | 会话超时时间,单位毫秒,默认60000ms |
connectionTimeoutMs | int | 连接创建超时时间,单位毫秒,默认60000ms |
retryPolicy | RetryPolicy | 重试策略,curator已经提供了多种重试策略,也可以自行实现RetryPolicy接口 |
curator提供的重试策略有:ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed
- 通过build创建,关于build里面的各个参数在CuratorFrameworkFactory api里面都顺带介绍了哦。
比如如下的实例代码,连接到127.0.0.1:2181服务端。
关于zookeeper的安装大家可以自己去网上搜下。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
1.2 启动客户端
调用start()方法启动客户端。这个时候zookeeper客户端才会去连接zookeeper服务端。在zookeeper客户端上做的所有动作都需要在start()之后执行。如果你不想连接服务端的时候可以调用close()方法断开连接.
1.3 节点操作
首先我们要明确zookeeper里面的节点结构类似于我们文件系统的结构(就像一棵树样的)。除此之外zookeeper的每个节点上还可以保存数据。zookeeper里面的节点有四种,不同的节点类型都有自己的特点:
- CreateMode.PERSISTENT:持久化节点。
- CreateMode.PERSISTENT_SEQUENTIAL:持久化并且带序列号节点。
- CreateMode.EPHEMERAL:临时节点(客户端断开了节点也就删除了)
- CreateMode.EPHEMERAL_SEQUENTIAL:临时并且带序列号(客户端断开了节点也就删除了)
1.3.1 创建节点
创建节点很简单,我们前面已经创建了zookeeper客户端,并且调了start()方法把客户端启动起来了。
比如我们可以用如下的代码创建一个持久化的节点。通过withMode(CreateMode.PERSISTENT)来指定节点的类型。
/**
* 同步 创建持久化节点
*
* @param path 节点路径
* @throws Exception errors
*/
public void createPersistentNodeSync(String path) throws Exception
client.create()
.creatingParentContainersIfNeeded() // 自动递归创建父节点
.withMode(CreateMode.PERSISTENT)
.forPath(path);
我们也可以在创建节点的同时,给节点设置数据。
/**
* 同步-创建持久化节点
*
* @param path 节点路径
* @param data 节点对应的值
* @throws Exception errors
*/
public void createPersistentNodeSync(String path, byte[] data) throws Exception
client.create()
.creatingParentContainersIfNeeded() // 自动递归创建父节点
.withMode(CreateMode.PERSISTENT)
.forPath(path, data);
1.3.2 删除节点
删除叶子节点。(如果不是叶子节点是会报异常的)
/**
* 同步-删除一个叶子节点(注意哦,只能删除叶子节点否则报错的)
*
* @param path 需要删除的节点对应的路径
* @throws Exception errors
*/
public void deleteNodeSync(String path) throws Exception
client.delete()
.forPath(path);
我们也可以删除整个节点(包括节点下的子节点)。
/**
* 同步-删除一个节点,并且递归删除其所有的子节点
*
* @param path 需要删除的节点对应的路基
* @throws Exception errors
*/
public void deleteNodeRecursivelySync(String path) throws Exception
client.delete()
.deletingChildrenIfNeeded()
.forPath(path);
1.3.3 判断节点是否存在
通过节点的Stat来判断节点是否存在。
/**
* 同步-检查节点是否存在
*
* @param path 节点路径
* @return 节点是否存在
* @throws Exception errors
*/
public boolean isNodeExistSync(String path) throws Exception
Stat state = client.checkExists()
.forPath(path);
return state != null;
1.3.4 节点数据操作
读取节点数据。
/**
* 同步-读取一个节点的数据内容
*
* @param path 节点路基
* @return 节点内容
* @throws Exception errors
*/
public byte[] getNodeDataSync(String path) throws Exception
return client.getData()
.forPath(path);
更新节点数据,或者设置null删除节点数据
/**
* 同步-更新一个节点的数据内容
*
* @param path 节点路径
* @param data 节点对应数据
* @throws Exception errors
*/
public void updateNodeDataSync(String path, byte[] data) throws Exception
client.setData()
.forPath(path, data);
1.3.5 获取节点的所有子节点
/**
* 同步-获取某个节点的所有子节点路径
*
* @param path 目录
* @return children
* @throws Exception errors
*/
public List<String> getChildrenSync(String path) throws Exception
return client.getChildren()
.forPath(path);
1.4 事务
事务相信大家都非常的熟悉。Curator也提供了事务的支持,一组crud操作要么都成功,要么都失败。使用起来也非常的简单。
一个事务里面肯定是有多个操作的,我们首先要把每个操作都封装成CuratorOp。比如如下的实例,我们把多个操作放到一个事务里面去执行.
@Test
public void transaction() throws Exception
CuratorOp createOp = client.transactionOp().create().forPath("/a/path", "some data".getBytes());
CuratorOp setDataOp = client.transactionOp().setData().forPath("/another/path", "other data".getBytes());
CuratorOp deleteOp = client.transactionOp().delete().forPath("/yet/another/path");
Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp, deleteOp);
for (CuratorTransactionResult result : results)
System.out.println(result.getForPath() + " - " + result.getType());
1.5 异步操作
因为zookeeper客户端的操作都是在和zookeeper服务端打交道的。涉及到网络的调用。所以有些操作的响应就不会那么及时了。Curator就给提供了异步操作。异步响应操作结果。
既然是异步操作,那么肯定需要BackgroundCallback来异步接收操作结果了。关于异步操作,我们也举一个简单的例子,我们以创建节点来举例(删除节点,修改节点数据,事务等等其他操作都是一样的使用)。
/**
* 异步-获取某个节点的所有子节点路径
*
* @param path 目录
* @param callback 回调
* @throws Exception errors
*/
public void getChildrenAsync(String path, BackgroundCallback callback) throws Exception
client.getChildren()
.inBackground(callback)
.forPath(path);
/**
* 异步-获取某个节点的所有子节点路径
*
* @param path 目录
* @param callback 回调
* @param executor 回调在哪里执行
* @throws Exception errors
*/
public void getChildrenAsync(String path, BackgroundCallback callback, Executor executor) throws Exception
client.getChildren()
.inBackground(callback, executor)
.forPath(path);
二 Curator高级特性
Curator里面的curator-recipes ja包封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等等。而且这些特性都是在分布式系统里面常用的功能了。
2.1 Cache事件监听
Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
2.1.1 Path Cache
Path Cache用来监控子节点.当一个子节点增加, 更新,删除时, Path Cache会改变它的状态,会包含最新子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。
Path Cache的使用非常的简单,主要涉及到四个类:
- PathChildrenCache:Path Cache听实现类
- PathChildrenCacheEvent:子节点事件
- PathChildrenCacheListener: 子节点监听
- ChildData:子节点信息
关于Path Cache的使用,我们用一个实例来简单的说明下,实例里面也只是简单的创建了一个节点。最终监听到节点的创建.
@Test
public void pathChildrenCache() throws Exception
// 创建zookeeper客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
PathChildrenCache cache = new PathChildrenCache(client, "/tuacy/pathCache", true);
// 添加监听
cache.getListenable().addListener(new PathChildrenCacheListener()
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
System.out.println("事件类型:" + event.getType());
if (null != event.getData())
System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
);
cache.start();
// 添加节点
client.create().creatingParentContainersIfNeeded().forPath("/tuacy/pathCache/001");
Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
cache.close();
2.1.2 Node Cache
Node Cache与Path Cache类似,Node Cache只是监听某一指定的节点。子节点的变化它是不会管的。
Node Cache的使用涉及到下面的三个类:
- NodeCache - Node Cache实现类
- NodeCacheListener - 节点监听器
- ChildData - 节点数据
我们还是用一个简单的实例来说明。
@Test
public void nodeCache() throws Exception
// 创建zookeeper客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
final NodeCache cache = new NodeCache(client, "/tuacy/nodeCache");
cache.start();
// 添加监听
cache.getListenable().addListener(new NodeCacheListener()
@Override
public void nodeChanged() throws Exception
ChildData data = cache.getCurrentData();
if (null != data)
System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
else
System.out.println("节点被删除!");
);
// 添加节点
client.create().creatingParentsIfNeeded().forPath("/tuacy/nodeCache");
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
client.setData().forPath("/tuacy/nodeCache", "abc".getBytes());
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
client.delete().forPath("/tuacy/nodeCache");
Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
cache.close();
2.1.3 Tree Cache
Tree Cache可以监控整个树上的所有节点,就是PathCache和NodeCache的组合功能。
Tree Cache的使用涉及到下面四个类。
- TreeCache - Tree Cache实现类
- TreeCacheListener - 监听器类
- TreeCacheEvent - 触发的事件类
- ChildData - 节点数据
我们还是以具体的实例来说明Tree Cache的使用。
@Test
public void nodeCache() throws Exception
// 创建zookeeper客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
final TreeCache cache = TreeCache.newBuilder(client, "/tuacy/treeCache")
.setCacheData(true)
.build();
// 添加监听
cache.getListenable().addListener(new TreeCacheListener()
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
System.out.println("事件类型:" + event.getType() + " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
);
cache.start();
// 添加节点
client.create().creatingParentsIfNeeded().forPath("/tuacy/treeCache");
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
// 给节点设置数据
client.setData().forPath("/tuacy/treeCache", "abc".getBytes());
// 创建子节点
client.create().creatingParentsIfNeeded().forPath("/tuacy/treeCache/001");
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
// 修改子节点的数据
client.setData().forPath("/tuacy/treeCache/001", "abc".getBytes());
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
// 删除子节点
client.delete().forPath("/tuacy/treeCache/001");
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
// 删除节点
client.delete().forPath("/tuacy/treeCache");
Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
cache.close();
client.close();
2.2 Leader选举
在分布式系统中,选主是一个很常见的场景(Leader,Slaver真的真的是非常的常见)。
- 主节点是唯一的。
- 各个节点获取主节点的概率是一样的,一旦某个节点被选为了主节点(Leader),其他的从节点(Slaver)也要能感知到。
- 一旦主节点断开,其他的从节点重新选出一个主节点。
2.2.1 LeaderLatch
在不同的zookeeper客户端,使用了相同latch path的LeaderLatch,当中的一个最终会被选举为leader,可以通过hasLeadership方法查看LeaderLatch实例是否leade。也可以在LeaderLatchListener里面监听当前节点是否是leader。使用LeaderLatch的时候如果不想参与选举了要调用close()方法退出选举。
LeaderLatch api介绍
public class LeaderLatch
/**
* 构造函数
*
* @param client CuratorFramework
* @param latchPath 路径,所有参与者同一个路径
*/
public LeaderLatch(CuratorFramework client, String latchPath);
public LeaderLatch(CuratorFramework client, String latchPath, String id);
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode);
/**
* 参与选举
*/
public void start() throws Exception;
/**
* 退出选举
*/
@Override
public void close() throws IOException;
/**
* 退出选举
* 关闭方式:SILENT : 静默关闭,不触发相关监听器、NOTIFY_LEADER :关闭时触发监听器
*/
public synchronized void close(CloseMode closeMode) throws IOException;
/**
* 添加监听器,监听是否当选为leader
*/
public void addListener(LeaderLatchListener listener);
public void addListener(LeaderLatchListener listener, Executor executor);
/**
* 移除监听器
*/
public void removeListener(LeaderLatchListener listener);
/**
* 尝试让当前LeaderLatch实例为leader
*/
public void await() throws InterruptedException, EOFException
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 获取构造函数里面这是的id
*/
public String getId();
/**
* 获取当前LeaderLatch实例的状态
*/
public State getState();
/**
* 返回所有的参与者
*/
public Collection<Participant> getParticipants() throws Exception;
/**
* 返回当前leader节点信息
*/
public Participant getLeader() throws Exception;
/**
* 判断实例是否是leader
*/
public boolean hasLeadership();
我们用一个简单的实例来说明LeaderLatch用法,比如我们创建10个zookeeper客户端来进行选举。
@Test
public void leaderLatch() throws Exception
List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
List<LeaderLatch> leaderLatchList = Lists.newArrayList();
// 启动10个zookeeper客户端
for (int index = 0; index < 10; index++)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
zookeeperClientList.add(client);
// 这里我们所有的客户端都参与leader选举
for (int index = 0; index < zookeeperClientList.size(); index++)
// 所有的客户端都参与leader选举
final LeaderLatch latch = new LeaderLatch(zookeeperClientList.get(index), LEADER_PATH, index + "");
latch.addListener(new LeaderLatchListener()
@Override
public void isLeader()
System.out.println("我是leader: " + latch.getId());
@Override
public void notLeader()
System.out.println("我不是leader: " + latch.getId());
);
latch.start();
leaderLatchList.add(latch);
// 30S之后
Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
// 我们找到谁是leader
String leaderId = leaderLatchList.get(0).getLeader().getId();
System.out.println("当前leader id : " + leaderId);
leaderLatchList.forEach(item ->
// 这里我们吧leader退出选举,让剩下的重新选举
if (item.getId().equals(leaderId))
try
item.close();
catch (IOException e)
e.printStackTrace();
);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
leaderLatchList.forEach(curatorFramework ->
// 退出选举
try
curatorFramework.close();
catch (IOException e)
e.printStackTrace();
);
zookeeperClientList.forEach(curatorFramework ->
// 关闭客户端
curatorFramework.close();
);
2.2.2 LeaderSelector
LeaderSelector也是一个用于分布式选举的类,相对于LeaderLatch来说,LeaderSelector更加的灵活点。LeaderSelector使用的时候主要涉及下面几个类:
- LeaderSelector:LeaderSelector选举实例类。
- LeaderSelectorListener:监听选举状态和连接状态
- LeaderSelectorListenerAdapter:实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类。
- CancelLeadershipException: 抢主失败异常。
LeaderSelector api 介绍
public class LeaderSelector
/**
* 构造函数
* @param client CuratorFramework
* @param leaderPath 路径
* @param listener 监听器
*/
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);
/**
* 保证在此实例释放领导权之后还可能获得领导权
*/
public void autoRequeue();
/**
* 设置获取当前实例对应的id
*/
public void setId(String id);
public String getId();
/**
* 当前实例参与选举
*/
public void start();
/**
* 重新键入到参与者队列里面去选举,如果此实例已在参与者排队里面,则不会发生任何操作并返回false。如果实例未排队,则重新执行该操作并返回true
*/
public boolean requeue();
/**
* 退出选举
*/
public synchronized void close();
/**
* 获取所有的参与者
*/
public Collection<Participant> getParticipants() throws Exception;
/**
* 获取leader
*/
public Participant getLeader() throws Exception;
/**
* 当前节点是否是leader
*/
public boolean hasLeadership();
/**
* 如果当前实例是leader的话,尝试终断领导权
*/
public synchronized void interruptLeadership();
ConnectionStateListener、LeaderSelectorListener
public interface ConnectionStateListener
/**
* 监听网络连接问题
*/
public void stateChanged(CuratorFramework client, ConnectionState newState);
/**
* Notification for leadership
*
* @see LeaderSelector
*/
public interface LeaderSelectorListener extends ConnectionStateListener
/**
* 当前节点获取到leader权之后调用,注意:在您希望释放领导力之前,此方法不应返回
* 所以说如果你想一直占有leader权利,就在里面写个无限循环吧
*/
public void takeLeadership(CuratorFramework client) throws Exception;
LeaderSelectorListenerAdapter
/**
* 实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类
*/
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener
/**
* 当遇到SUSPENDED以及LOST时直接抛出CancelLeadershipException从而去引发LeaderSelector.interruptLeadership()调用
*/
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
if ( client.getConnectionStateErrorPolicy().isErrorState(newState) )
throw new CancelLeadershipException();
我们还是用一个简单的实例来说明LeaderSelector的用法,我们还是创建10个zookeeper客户端。并且我们创建一个LeaderSelectorAdapter类,在里面当是leader之后的一些处理,如果是leader 10s之后,释放leader权力重新选举。
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter
private final LeaderSelector leaderSelector;
public LeaderSelectorAdapter(CuratorFramework client, String path, String id)
// 创建一个LeaderSelector对象
leaderSelector = new LeaderSelector(client, path, this);
// 设置id
leaderSelector.setId(id);
// 保证在此实例释放领导权之后还可能获得领导权
leaderSelector.autoRequeue();
/**
* 参与选举
*/
public void start()
// 参与选举
leaderSelector.start();
/**
* 退出选举
*/
public void close()
// 退出选举
leaderSelector.close();
/**
* 当获得leader的时候,这个方法会被调用。如果还想继续当leader,这个方法不能返回。如果你想要要此实例一直是leader的话可以加一个死循环
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception
System.out.println(leaderSelector.getId() + " 是leader");
try
// 当上leader 5s之后,释放leader权利
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
catch (InterruptedException e)
System.err.println(leaderSelector.getId() + " 被中断.");
Thread.currentThread().interrupt();
finally
System.out.println(leaderSelector.getId() + " 释放leader的权力。");
private static final String LEADER_PATH = "/tuacy/leaderSelector";
@Test
public void leaderSelector() throws Exception
List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
List<LeaderSelectorAdapter> leaderLatchList = Lists.newArrayList();
// 启动10个zookeeper客户端
for (int index = 0; index < 10; index++)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
zookeeperClientList.add(client);
// 这里我们所有的客户端都参与leader选举
for (int index = 0; index < zookeeperClientList.size(); index++)
// 所有的客户端都参与leader选举
final LeaderSelectorAdapter latch = new LeaderSelectorAdapter(zookeeperClientList.get(index), LEADER_PATH, index + "");
latch.start();
leaderLatchList.add(latch);
// 1分钟之后关掉程序
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
leaderLatchList.forEach(curatorFramework ->
// 退出选举
curatorFramework.close();
);
zookeeperClientList.forEach(curatorFramework ->
// 关闭客户端
curatorFramework.close();
);
2.3 分布式锁
分布式锁也是咱们分布式系统里面非常常见的功能了。Curator直接就帮我们做到了,省的我们自己去实现分布式锁。
2.3.1 InterProcessMutex
InterProcessMutex公平锁、可重入锁。和ReentrantLock类似。
InterProcessMutex api 介绍
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
/**
* InterProcessMutex的构造函数,
*/
public InterProcessMutex(CuratorFramework client, String path);
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);
/**
* 申请获取锁
*/
@Override
public void acquire() throws Exception;
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
*
* 如果此JVM中的线程获取了互斥锁,则返回true
*/
@Override
public boolean isAcquiredInThisProcess();
/**
* 释放锁
*/
@Override
public void release() throws Exception;
/**
* 返回所有参与获取锁的所有当前节点的排序列表
*/
public Collection<String> getParticipantNodes() throws Exception;
/**
* 将锁设为可撤销的. 当别的进程或线程想让你释放锁是Listener会被调用
*/
@Override
public void makeRevocable(RevocationListener<InterProcessMutex> listener);
@Override
public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor);
/**
* 如果调用线程获取互斥锁,则返回true
*/
public boolean isOwnedByCurrentThread();
2.3.2 InterProcessSemaphoreMutex
InterProcessSemaphoreMutex不可重入锁。
InterProcessSemaphoreMutex api介绍
public class InterProcessSemaphoreMutex implements InterProcessLock
/**
* 构造函数
*/
public InterProcessSemaphoreMutex(CuratorFramework client, String path);
/**
* 申请获取锁
*/
@Override
public void acquire() throws Exception;
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* 释放锁
*/
@Override
public void release() throws Exception;
/**
* 如果此JVM中的线程获取了互斥锁,则返回true
*/
@Override
public boolean isAcquiredInThisProcess();
2.3.3 InterProcessReadWriteLock
InterProcessReadWriteLock 读写锁。和ReadWriteLock类似。
InterProcessReadWriteLock api 介绍
public class InterProcessReadWriteLock
/**
* 读锁
*/
private final InterProcessMutex readMutex;
/**
* 写锁
*/
private final InterProcessMutex writeMutex;
/**
* 构造函数
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath)
/**
* 构造函数
* lockData是存储在节点上的数据
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData);
/**
* 获取读锁
*/
public InterProcessMutex readLock();
/**
* 获取写锁
*/
public InterProcessMutex writeLock();
2.3.4 信号量(InterProcessSemaphoreV2)
InterProcessSemaphoreV2实现了一个跨jvm的信号量,主要工作原理是:acquire时创建一个临时顺序节点,如果创建成功且临时节点数小于等于maxLeases则说明信号量获取成功,否则wait等待,等待目录发生变化或计数改变时唤醒。和Semaphore的功能类似.
分布式信号量的使用。我们需要了解以下三个类。
- InterProcessSemaphoreV2:信号量实现类
- Lease:租约(单个信号)
- SharedCountReader:计数器,用于计算最大租约数量
InterProcessSemaphoreV2 api 介绍
public class InterProcessSemaphoreV2
/**
* 构造函数
* @param client CuratorFramework
* @param path 节点路径
* @param maxLeases 允许此实例的最大租约数
*/
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);
/**
* 构造函数
* @param client CuratorFramework
* @param path 节点路径
* @param count 用于最大租约的共享计数
*/
public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count);
/**
* 此信号量创建的节点放置的数据,必须在调用其中一个acquire()方法之前调用它
*/
public void setNodeData(byte[] nodeData);
/**
* 返回参与信号量的所有当前节点的列表
*/
public Collection<String> getParticipantNodes() throws Exception;
/**
* 关闭给定租约集合中的所有租约
*/
public void returnAll(Collection<Lease> leases);
/**
* 关闭租约
*/
public void returnLease(Lease lease);
/**
* 获取租约,如果没有租约获取会一直阻塞直到获取到租约
*/
public Lease acquire() throws Exception;
public Lease acquire(long time, TimeUnit unit) throws Exception
/**
* 获取指定数量的租约,如果没有获取到制定数量租约会一直阻塞
*/
public Collection<Lease> acquire(int qty) throws Exception;
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception;
2.3.5 InterProcessMultiLock(多共享锁对象
它可以把多个锁包含起来像一个锁一样进行操作,简单来说就是对多个锁进行一组操作。当acquire的时候就获得多个锁资源,否则失败。同样调用release时所有的锁都被release(失败被忽略)。
InterProcessMultiLock api 介绍
public class InterProcessMultiLock implements InterProcessLock
/**
* 构造函数
*
* @param client CuratorFramework
* @param paths 节点列表对应的路径(多个路径就是多个锁)
*/
public InterProcessMultiLock(CuratorFramework client, List<String> paths);
/**
* 构造函数
*/
public InterProcessMultiLock(List<InterProcessLock> locks);
/**
* 请求锁
*/
@Override
public void acquire() throws Exception;
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* 释放锁
*/
@Override
public synchronized void release() throws Exception;
/**
* 如果此JVM中的线程获取了所有的锁,则返回true
*/
@Override
public synchronized boolean isAcquiredInThisProcess();
2.4 分布式计数器
计数器是用来计数的,利用ZooKeeper可以实现一个分布式计数器。只要使用相同的path就可以得到最新的计数器值,这是由ZooKeeper的一致性保证的。Curator有两个计数器,一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。
2.4.1 SharedCount(int计数器)
SharedCount使用int类型来计数。相当于多个zookeeper客户端公用一个计算器。
- SharedCount:计数器的具体实现。
- SharedCountListener:监听数据的改变。
SharedCount api 介绍
public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener>
/**
* 构造函数
* @param client CuratorFramework
* @param path 计数器依赖的节点
* @param seedValue 如果当前节点对应的计数器没有值,就会用该值
*/
public SharedCount(CuratorFramework client, String path, int seedValue);
protected SharedCount(CuratorFramework client, String path, SharedValue sv);
/**
* 获取当前计数
*/
@Override
public int getCount();
/**
* 获取当前节点对应的版本信息
*/
@Override
public VersionedValue<Integer> getVersionedValue();
/**
* 设置计数器的值
*/
public void setCount(int newCount) throws Exception;
/**
* 设置计数器的值,这里要注意如果当前版本的值在这个时刻有改变则设置不成功。CAS操作
*/
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;
/**
* 添加监听器
*/
@Override
public void addListener(SharedCountListener listener);
@Override
public void addListener(final SharedCountListener listener, Executor executor);
/**
* 移除监听器
*/
@Override
public void removeListener(SharedCountListener listener);
/**
* 启动
*/
public void start() throws Exception;
/**
* 结束
*/
@Override
public void close() throws IOException;
SharedCount使用实例。模拟了10个zookeeper客户端。每个客户端都加5次。最终结果50就对了。
public class SharedCountTest
private static final String PATH_COUNTER = "/int/counter";
class CounterThread extends Thread
private final CountDownLatch countDownLatch;
private final int threadIndex;
private final SharedCount counter;
CounterThread(SharedCount counter, int index, CountDownLatch countDownLatch)
this.counter = counter;
this.threadIndex = index;
this.countDownLatch = countDownLatch;
@Override
public void run()
try
for (int index = 0; index < 5; index++)
while (true)
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
boolean success = counter.trySetCount(counter.getVersionedValue(), counter.getCount() + 1);
if (success)
break;
catch (Exception e)
e.printStackTrace();
finally
try
System.out.println("当前值为:" + counter.getCount());
counter.close();
catch (Exception e)
//ignore
countDownLatch.countDown();
@Test
public void sharedCount() throws Exception
CountDownLatch countDownLatch = new CountDownLatch(10);
List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
// 启动10个zookeeper客户端
for (int index = 0; index < 10; index++)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
zookeeperClientList.add(client);
// 如果节点存在,我们就删除节点
zookeeperClientList.get(0).delete().forPath(PATH_COUNTER);
for (int index = 0; index < zookeeperClientList.size(); index++)
SharedCount sharedCount = new SharedCount(zookeeperClientList.get(index), PATH_COUNTER, 0);
sharedCount.addListener(new SharedCountListener()
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
System.out.println("计数器值改变,现在的值为:" + newCount);
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
// 连接状态改变
);
sharedCount.start();
new CounterThread(sharedCount, index, countDownLatch).start();
countDownLatch.await();
zookeeperClientList.forEach(curatorFramework ->
// 关闭客户端
curatorFramework.close();
);
2.4.2 DistributedAtomicLong(long计数器)
DistributedAtomicLong使用Long类型来计数。
DistributedAtomicLong api 介绍
public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
/**
* 构造函数(乐观锁模式)
*
* @param client CuratorFramework
* @param counterPath 节点路径
* @param retryPolicy 重试策略 -- 乐观加锁
*/
public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy);
/**
* 构造函数,retryPolicy(乐观加锁)还没成功,则进行promotedToLock的方式以互斥的方式加锁重试 (排他锁模式)
*
* @param client CuratorFramework
* @param counterPath 节点路径
* @param retryPolicy 重试策略 -- 乐观加锁
* @param promotedToLock 排他锁策略
*/
public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy, PromotedToLock promotedToLock);
/**
* 获取当前值
*/
@Override
public AtomicValue<Long> get() throws Exception
/**
* 强制设置计数值
*/
@Override
public void forceSet(Long newValue) throws Exception;
/**
* CAS更新(乐观锁模式更新)
*/
@Override
public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception;
/**
* 设置值
*/
@Override
public AtomicValue<Long> trySet(Long newValue) throws Exception;
/**
* 如果之前没有初始值,则把初始值设置进去
*/
@Override
public boolean initialize(Long initialize) throws Exception;
/**
* +1
*/
@Override
public AtomicValue<Long> increment() throws Exception;
/**
* -1
*/
@Override
public AtomicValue<Long> decrement() throws Exception;
/**
* 加一个指定的值
*/
@Override
public AtomicValue<Long> add(Long delta) throws Exception;
/**
* 键一个指定的值
*/
@Override
public AtomicValue<Long> subtract(Long delta) throws Exception;
DistributedAtomicLong怎么使用,直接给实例。也是模拟10个客户端,每个客户端增加5次。最终结果得到50就对了。
public class DistributedAtomicLongTest
private static final String PATH_COUNTER = "/long/counter";
class CounterThread extends Thread
private final CountDownLatch countDownLatch;
private final int threadIndex;
private final DistributedAtomicLong counter;
CounterThread(DistributedAtomicLong counter, int index, CountDownLatch countDownLatch)
this.counter = counter;
this.threadIndex = index;
this.countDownLatch = countDownLatch;
@Override
public void run()
try
for (int index = 0; index < 5; index++)
// 保证成功
while (true)
AtomicValue<Long> value = counter.increment();
if (value.succeeded())
System.out.println("succeed: " + value.succeeded() + " value:" + value.postValue());
break;
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
catch (Exception e)
e.printStackTrace();
finally
countDownLatch.countDown();
@Test
public void distributedAtomicLong() throws Exception
CountDownLatch countDownLatch = new CountDownLatch(10);
List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
// 启动10个zookeeper客户端
for (int index = 0; index < 10; index++)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
zookeeperClientList.add(client);
// 如果节点存在,我们就删除节点
if (zookeeperClientList.get(0).checkExists().forPath(PATH_COUNTER) != null)
zookeeperClientList.get(0).delete().forPath(PATH_COUNTER);
for (int index = 0; index < zookeeperClientList.size(); index++)
// 乐观锁模式
DistributedAtomicLong count = new DistributedAtomicLong(zookeeperClientList.get(index), PATH_COUNTER, new RetryNTimes(10, 10));
boolean initializeSuccess = count.initialize(0L);
if (initializeSuccess)
System.out.println("初始化成功");
else
System.out.println("初始化失败");
new CounterThread(count, index, countDownLatch).start();
countDownLatch.await();
zookeeperClientList.forEach(curatorFramework ->
// 关闭客户端
curatorFramework.close();
);
2.5 分布式队列
2.5.4 简单队列 - SimpleDistributedQueue
SimpleDistributedQueue是一种简单队列,和jdk中队列类似,拥有offer()、take()方法。
SimpleDistributedQueue的使用是很简单的,所以我们就直接给出SimpleDistributedQueue的使用实例了。
public class SimpleDistributedQueueTest
private static final String SIMPLE_DISTRIBUTED_QUEUE_PATH = "/SimpleDistributedQueue";
class QueueActionThread extends Thread
private final SimpleDistributedQueue queue;
private final CountDownLatch countDownLatch;
private final int queueIndex;
QueueActionThread(SimpleDistributedQueue queue, int index, CountDownLatch countDownLatch)
this.queue = queue;
this.queueIndex = index;
this.countDownLatch = countDownLatch;
@Override
public void run()
try
for (int index = 0; index < 5; index++)
String message = "我是队列:" + queueIndex + " 的第-" + index + "-条消息";
this.queue.offer(message.getBytes());
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
for (int index = 0; index < 5; index++)
byte[] queueItem = queue.take();
System.out.println("我是队列:" + queueIndex + " 我收到了:" + new String(queueItem));
catch (Exception e)
e.printStackTrace();
finally
countDownLatch.countDown();
@Test
public void simpleDistributedQueue() throws Exception
CountDownLatch countDownLatch = new CountDownLatch(10);
List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
// 启动10个zookeeper客户端
for (int index = 0; index < 10; index++)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.build();
// 启动客户端
client.start();
zookeeperClientList.add(client);
for (int index = 0; index < zookeeperClientList.size(); index++)
SimpleDistributedQueue queue = new SimpleDistributedQueue(zookeeperClientList.get(index), SIMPLE_DISTRIBUTED_QUEUE_PATH);
new QueueActionThread(queue, index, countDownLatch).start();
countDownLatch.await();
// 关闭客户端
zookeeperClientList以上是关于Zookeeper客户端Curator使用详解的主要内容,如果未能解决你的问题,请参考以下文章