ZooKeeper : Curator框架之共享计数器DistributedAtomicLong

Posted ITKaven

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper : Curator框架之共享计数器DistributedAtomicLong相关的知识,希望对你有一定的参考价值。

DistributedAtomicLong

原子增量操作的计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的InterProcessMutex(悲观锁)进行增量操作。 对于乐观锁和悲观锁,重试策略都用于重试增量操作。

各种增量方法都会返回一个AtomicValue实例,通过调用AtomicValue实例的succeeded()可以查询增量操作是否执行成功,除了get() 外,其他任何方法都不保证一定成功。

AtomicValue接口源码(原子操作返回值的抽象):

public interface AtomicValue<T>

    /**
     * 如果操作成功,则返回true
     * 如果返回false,则操作失败
     */
    public boolean      succeeded();

    /**
     * 返回操作前计数器的值
     */
    public T            preValue();

    /**
     * 返回操作后计数器的值
     */
    public T            postValue();

    /**
     * 返回操作的调试统计信息,比如乐观锁、悲观锁尝试的次数与时间
     */
    public AtomicStats  getStats();

DistributedAtomicLong类中的内部类AtomicLong实现了AtomicValue接口,但实际上只是起到封装的作用,所有的调用都委托给了bytes属性(其他实现类的实例)。

    private class AtomicLong implements AtomicValue<Long>
    
        private AtomicValue<byte[]> bytes;

        private AtomicLong(AtomicValue<byte[]> bytes)
        
            this.bytes = bytes;
        

        @Override
        public boolean succeeded()
        
            return bytes.succeeded();
        

        @Override
        public Long preValue()
        
            return bytesToValue(bytes.preValue());
        

        @Override
        public Long postValue()
        
            return bytesToValue(bytes.postValue());
        

        @Override
        public AtomicStats getStats()
        
            return bytes.getStats();
        
    

DistributedAtomicLong类实现了DistributedAtomicNumber接口,并且DistributedAtomicLong将各种原子操作的执行委托给了DistributedAtomicValue

public class DistributedAtomicLong implements DistributedAtomicNumber<Long>

    private final DistributedAtomicValue        value;
    ...

DistributedAtomicNumber接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。

public interface DistributedAtomicNumber<T>

    public AtomicValue<T> get() throws Exception;
    public AtomicValue<T> compareAndSet(T expectedValue, T newValue) throws Exception;
    public AtomicValue<T> trySet(T newValue) throws Exception;
    public boolean initialize(T value) throws Exception;
    public void forceSet(T newValue) throws Exception;
    public AtomicValue<T> increment() throws Exception;
    public AtomicValue<T> decrement() throws Exception;
    public AtomicValue<T> add(T delta) throws Exception;
    public AtomicValue<T> subtract(T delta) throws Exception;

目前DistributedAtomicNumber接口有两种实现,除了DistributedAtomicLong类,还有DistributedAtomicInteger类。

并且DistributedAtomicInteger也是将各种原子操作的执行委托给了DistributedAtomicValue,所以这两种实现是类似的,只不过表示的数值类型不同而已。

public class DistributedAtomicInteger implements DistributedAtomicNumber<Integer>

    private final DistributedAtomicValue        value;
    ...

DistributedAtomicValue是原子操作真正的执行者,因此可以知道内部类AtomicLongbytes属性是MutableAtomicValue实例。

    public AtomicValue<byte[]>     get() throws Exception
    
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
        ...
        return result;
    

测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

CuratorFrameworkProperties类(提供CuratorFramework需要的一些配置信息,以及创建CuratorFramework实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties 
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() 
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    

DistributedAtomicLongRunnable类(实现了Runnable接口,模拟分布式节点操作分布式原子长整型):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;

public class DistributedAtomicLongRunnable implements Runnable
    @SneakyThrows
    @Override
    public void run() 
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 共享计数器的路径
        String counterPath = "/kaven";

        // 创建DistributedAtomicLong实例,用于操作分布式原子长整型
        // new RetryNTimes(100, 5)是乐观锁的重试策略实例
        DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5));

        // 初始化
        boolean initialize = atomicLong.initialize(100L);
        if(initialize) 
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
        
        else 
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
        

        // 比较再设置,当Zookeeper中的值与期望值相等时才能设置新值
        AtomicValue<Long> longAtomicValue = atomicLong.compareAndSet(100L, 501L);
        if(longAtomicValue.succeeded()) 
            System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
        
        else 
            System.out.println(Thread.currentThread().getName() + " compareAndSet 失败");
        
    

启动类:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application 
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception 
        // 分布式节点处理业务
        for (int i = 0; i < 15; i++) 
            EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
        
    

模拟15个分布式节点操作分布式原子长整型,输出如下所示:

pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-8 compareAndSet 失败
pool-1-thread-14 compareAndSet 失败
pool-1-thread-10 compareAndSet 失败
pool-1-thread-6 compareAndSet 失败
pool-1-thread-15 compareAndSet 失败
pool-1-thread-13 compareAndSet 失败
pool-1-thread-7 compareAndSet 失败
pool-1-thread-9 compareAndSet 失败
pool-1-thread-11 compareAndSet 失败
pool-1-thread-5 compareAndSet 失败
pool-1-thread-12 compareAndSet 失败
pool-1-thread-1 compareAndSet 失败
pool-1-thread-3 compareAndSet 成功
pool-1-thread-4 compareAndSet 失败
pool-1-thread-2 compareAndSet 失败

输出是符合预期的,两种操作都只有一个节点执行成功。DistributedAtomicValue类的initializecompareAndSet方法如下所示,其实就是创建Zookeeper节点(只有一个服务能创建成功)和基于版本设置节点的值(在博主的测试程序中,也只能有一个服务将该操作执行成功),而这两种操作并没有使用锁(乐观锁和悲观锁)。

    public boolean initialize(byte[] value) throws Exception
    
        try
        
            client.create().creatingParentContainersIfNeeded().forPath(path, value);
        
        catch ( KeeperException.NodeExistsException ignore )
        
            // ignore
            return false;
        
        return true;
    
    
    public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
    
        Stat                        stat = new Stat();
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
        boolean                     createIt = getCurrentValue(result, stat);
        if ( !createIt && Arrays.equals(expectedValue, result.preValue) )
        
            try
            
                client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
                result.succeeded = true;
                result.postValue = newValue;
            
            catch ( KeeperException.BadVersionException dummy )
            
                result.succeeded = false;
            
            catch ( KeeperException.NoNodeException dummy )
            
                result.succeeded = false;
            
        
        else
        
            result.succeeded = false;
        
        return result;
    

incrementdecrementadd以及subtract这四种操作是类似的,博主只演示increment操作。

        DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5));

        boolean initialize = atomicLong.initialize(100L);
        if(initialize) 
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
        
        else 
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
        

        for (int i = 0; i < 1000; i++) 
            Thread.sleep(5);
            atomicLong.increment();
        
        System.out.println(Thread.以上是关于ZooKeeper : Curator框架之共享计数器DistributedAtomicLong的主要内容,如果未能解决你的问题,请参考以下文章

ZooKeeper : Curator框架之分布式锁InterProcessMutex

curator

Zookeeper开源客户端Curator之基本功能讲解

浅谈Zookeeper开源客户端框架Curator

Zookeeper--Curator框架

zookeeper java客户端之curator详解