ZooKeeper : Curator框架之共享计数器DistributedAtomicLong
Posted ITKaven
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper : Curator框架之共享计数器DistributedAtomicLong相关的知识,希望对你有一定的参考价值。
DistributedAtomicLong
原子增量操作的计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的InterProcessMutex
(悲观锁)进行增量操作。 对于乐观锁和悲观锁,重试策略都用于重试增量操作。
各种增量方法都会返回一个AtomicValue
实例,通过调用AtomicValue
实例的succeeded()
可以查询增量操作是否执行成功,除了get()
外,其他任何方法都不保证一定成功。
AtomicValue
接口源码(原子操作返回值的抽象):
public interface AtomicValue<T>
/**
* 如果操作成功,则返回true
* 如果返回false,则操作失败
*/
public boolean succeeded();
/**
* 返回操作前计数器的值
*/
public T preValue();
/**
* 返回操作后计数器的值
*/
public T postValue();
/**
* 返回操作的调试统计信息,比如乐观锁、悲观锁尝试的次数与时间
*/
public AtomicStats getStats();
DistributedAtomicLong
类中的内部类AtomicLong
实现了AtomicValue
接口,但实际上只是起到封装的作用,所有的调用都委托给了bytes
属性(其他实现类的实例)。
private class AtomicLong implements AtomicValue<Long>
private AtomicValue<byte[]> bytes;
private AtomicLong(AtomicValue<byte[]> bytes)
this.bytes = bytes;
@Override
public boolean succeeded()
return bytes.succeeded();
@Override
public Long preValue()
return bytesToValue(bytes.preValue());
@Override
public Long postValue()
return bytesToValue(bytes.postValue());
@Override
public AtomicStats getStats()
return bytes.getStats();
DistributedAtomicLong
类实现了DistributedAtomicNumber
接口,并且DistributedAtomicLong
将各种原子操作的执行委托给了DistributedAtomicValue
。
public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
private final DistributedAtomicValue value;
...
DistributedAtomicNumber
接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。
public interface DistributedAtomicNumber<T>
public AtomicValue<T> get() throws Exception;
public AtomicValue<T> compareAndSet(T expectedValue, T newValue) throws Exception;
public AtomicValue<T> trySet(T newValue) throws Exception;
public boolean initialize(T value) throws Exception;
public void forceSet(T newValue) throws Exception;
public AtomicValue<T> increment() throws Exception;
public AtomicValue<T> decrement() throws Exception;
public AtomicValue<T> add(T delta) throws Exception;
public AtomicValue<T> subtract(T delta) throws Exception;
目前DistributedAtomicNumber
接口有两种实现,除了DistributedAtomicLong
类,还有DistributedAtomicInteger
类。
并且DistributedAtomicInteger
也是将各种原子操作的执行委托给了DistributedAtomicValue
,所以这两种实现是类似的,只不过表示的数值类型不同而已。
public class DistributedAtomicInteger implements DistributedAtomicNumber<Integer>
private final DistributedAtomicValue value;
...
DistributedAtomicValue
是原子操作真正的执行者,因此可以知道内部类AtomicLong
的bytes
属性是MutableAtomicValue
实例。
public AtomicValue<byte[]> get() throws Exception
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
...
return result;
测试
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
类(提供CuratorFramework
需要的一些配置信息,以及创建CuratorFramework
实例的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework()
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
DistributedAtomicLongRunnable
类(实现了Runnable
接口,模拟分布式节点操作分布式原子长整型):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;
public class DistributedAtomicLongRunnable implements Runnable
@SneakyThrows
@Override
public void run()
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 共享计数器的路径
String counterPath = "/kaven";
// 创建DistributedAtomicLong实例,用于操作分布式原子长整型
// new RetryNTimes(100, 5)是乐观锁的重试策略实例
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
// 初始化
boolean initialize = atomicLong.initialize(100L);
if(initialize)
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
else
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
// 比较再设置,当Zookeeper中的值与期望值相等时才能设置新值
AtomicValue<Long> longAtomicValue = atomicLong.compareAndSet(100L, 501L);
if(longAtomicValue.succeeded())
System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
else
System.out.println(Thread.currentThread().getName() + " compareAndSet 失败");
启动类:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception
// 分布式节点处理业务
for (int i = 0; i < 15; i++)
EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
模拟15
个分布式节点操作分布式原子长整型,输出如下所示:
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-8 compareAndSet 失败
pool-1-thread-14 compareAndSet 失败
pool-1-thread-10 compareAndSet 失败
pool-1-thread-6 compareAndSet 失败
pool-1-thread-15 compareAndSet 失败
pool-1-thread-13 compareAndSet 失败
pool-1-thread-7 compareAndSet 失败
pool-1-thread-9 compareAndSet 失败
pool-1-thread-11 compareAndSet 失败
pool-1-thread-5 compareAndSet 失败
pool-1-thread-12 compareAndSet 失败
pool-1-thread-1 compareAndSet 失败
pool-1-thread-3 compareAndSet 成功
pool-1-thread-4 compareAndSet 失败
pool-1-thread-2 compareAndSet 失败
输出是符合预期的,两种操作都只有一个节点执行成功。DistributedAtomicValue
类的initialize
和compareAndSet
方法如下所示,其实就是创建Zookeeper
节点(只有一个服务能创建成功)和基于版本设置节点的值(在博主的测试程序中,也只能有一个服务将该操作执行成功),而这两种操作并没有使用锁(乐观锁和悲观锁)。
public boolean initialize(byte[] value) throws Exception
try
client.create().creatingParentContainersIfNeeded().forPath(path, value);
catch ( KeeperException.NodeExistsException ignore )
// ignore
return false;
return true;
public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
Stat stat = new Stat();
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
boolean createIt = getCurrentValue(result, stat);
if ( !createIt && Arrays.equals(expectedValue, result.preValue) )
try
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.succeeded = true;
result.postValue = newValue;
catch ( KeeperException.BadVersionException dummy )
result.succeeded = false;
catch ( KeeperException.NoNodeException dummy )
result.succeeded = false;
else
result.succeeded = false;
return result;
increment
、decrement
、add
以及subtract
这四种操作是类似的,博主只演示increment
操作。
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
boolean initialize = atomicLong.initialize(100L);
if(initialize)
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
else
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
for (int i = 0; i < 1000; i++)
Thread.sleep(5);
atomicLong.increment();
System.out.println(Thread.以上是关于ZooKeeper : Curator框架之共享计数器DistributedAtomicLong的主要内容,如果未能解决你的问题,请参考以下文章