Apache Curator 实现分布式锁

Posted tangdong3415

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Curator 实现分布式锁相关的知识,希望对你有一定的参考价值。

Curator是Netflix公司开源的一套ZK客户端框架。基于Zookeeper官方API自定义实现分布式锁

有以下特征:

  • Curator  API简单明了,接入成本很低;
  • 解决了异常处理、Watch反复注册 ;
  • 客户端宕机等异常情况下,当前客户端持有的锁可实时释放;
  • 提供了各种分布式场景下的工具包,例如分布式锁的实现,分布式CyclicBarrier实现、Leader选举等;

方法:

Maven依赖 

<dependency>

    <groupId>org.apache.curator</groupId>

    <artifactId>curator-recipes</artifactId>

    <version>4.1.0</version>

</dependency>

<dependency>

    <groupId>org.apache.curator</groupId>

    <artifactId>curator-framework</artifactId>

    <version>4.1.0</version>

</dependency>

对外接口 

 

import java.util.concurrent.TimeUnit;

/**

 * 分布式锁

 * @author

 */

public interface DistributedLock

    /**

     * 获取锁(阻塞)

     * @param lockName 锁名

     */

    void lock(String lockName);

    /**

     * 获取锁(直接返回)

     * @param lockName 锁名

     * @return

     */

    boolean tryLock(String lockName);

    /**

     * 获取锁(直接返回)

     * @param lockName 锁名

     * @param time     超时时间

     * @param unit     超时单位

     * @return

     */

    boolean tryLock(String lockName, long time, TimeUnit unit);

    /**

     * 释放锁

     * @param lockName 锁名

     */

    void unLock(String lockName);

 

 

 

 

 

 

 

 

Curator分布式锁实现  折叠源码

import com.jd.trip.hotel.platform.common.utils.PropertiesUtil;

import com.jd.trip.hotel.resource.common.lock.DistributedLock;

import org.apache.commons.lang3.StringUtils;

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

 

import java.util.Properties;

import java.util.concurrent.TimeUnit;

 

/**

 * Curator框架自带的分布式锁

 * @author

 */

public class CuratorLock implements DistributedLock, InitializingBean

    Logger logger = LoggerFactory.getLogger(CuratorLock.class);

 

    static String root = "/locksNode";

    static volatile CuratorFramework client;

    static ThreadLocal<InterProcessMutex> lockThreadLocal = new ThreadLocal<>();

 

    @Override

    public void afterPropertiesSet() throws Exception

        init();

    

 

    public void init() throws Exception

        if (client == null)

            synchronized (CuratorLock.class)

                if (client == null)

                    Properties properties = PropertiesUtil.loadProperties("confcenter.properties");

                    String zkAddress = properties.getProperty("conf.center.zkserver");

                    if (StringUtils.isBlank(zkAddress))

                        logger.error("获取zk连接地址失败,请正确配置IP[集群地址以逗号隔开]");

                    

                    //重试策略:间隔时间为1s,重试10次

                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

                    client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);

                    //开启连接

                    client.start();

                    logger.info("curator客户端连接ZK成功");

                

            

        

    

 

    @Override

    public void lock(String lockName)

        InterProcessMutex lock = new InterProcessMutex(client, root + "/" + lockName);

        try

            lock.acquire();

            lockThreadLocal.set(lock);

            logger.info("线程:获取curator-zk锁:成功", Thread.currentThread().getName(), lockName);

         catch (Exception e)

            logger.error("线程:获取curator-zk锁:失败", Thread.currentThread().getName(), lockName);

            unLock(lockName);

            throw new RuntimeException("curator-zk锁:" + lockName + "添加失败", e);

        

    

 

    @Override

    public boolean tryLock(String lockName)

        return tryLock(lockName, 0, TimeUnit.SECONDS);

    

 

    @Override

    public boolean tryLock(String lockName, long time, TimeUnit unit)

        boolean isLock;

        InterProcessMutex lock = new InterProcessMutex(client, root + "/" + lockName);

        try

            isLock = lock.acquire(time, unit);

            if (isLock)

                lockThreadLocal.set(lock);

                logger.info("线程:获取curator-zk锁:成功", Thread.currentThread().getName(), lockName);

            

         catch (Exception e)

            logger.error("线程:获取curator-zk锁:失败", Thread.currentThread().getName(), lockName);

            unLock(lockName);

            throw new RuntimeException("curator-zk锁:" + lockName + "添加失败", e);

        

        return isLock;

    

 

    @Override

    public void unLock(String lockName)

        if (lockThreadLocal.get() != null)

            try

                lockThreadLocal.get().release();

                logger.info("线程:释放curator-zk锁:成功", Thread.currentThread().getName(), lockName);

             catch (Exception e)

                logger.error("线程:释放curator-zk锁:失败", Thread.currentThread().getName(), lockName);

                throw new RuntimeException("curator-zk锁:" + lockName + "释放失败", e);

             finally

                lockThreadLocal.remove();

            

        

    

 

接入及使用方式 

<!--如果要替换为别的加锁方式,只需修改class即可-->

<bean id="distributedLock" class="com.jd.trip.hotel.resource.common.lock.impl.CuratorLock"></bean>

 

 

//调用方式(业务代码中使用)

@Autowired

DistributedLock distributedLock;

 

//加锁

distributedLock.tryLock(JimConstant.DELETE_HISTORY_LOG_SYNC)

//解锁

distributedLock.unLock(JimConstant.DELETE_HISTORY_LOG_SYNC);

 

以上是关于Apache Curator 实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper实现分布式锁-curator

zookeeperApache curator的使用及zk分布式锁实现

curator

Curator实现分布式锁的基本原理

Curator实现分布式锁的基本原理

Curator实现分布式锁的基本原理