import com.jd.trip.hotel.platform.common.utils.PropertiesUtil; import com.jd.trip.hotel.resource.common.lock.DistributedLock; import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * Curator框架自带的分布式锁 * @author */ public class CuratorLock implements DistributedLock, InitializingBean Logger logger = LoggerFactory.getLogger(CuratorLock. class ); static String root = "/locksNode" ; static volatile CuratorFramework client; static ThreadLocal<InterProcessMutex> lockThreadLocal = new ThreadLocal<>(); @Override public void afterPropertiesSet() throws Exception init();
public void init() throws Exception if (client == null ) synchronized (CuratorLock. class ) if (client == null ) Properties properties = PropertiesUtil.loadProperties( "confcenter.properties" ); String zkAddress = properties.getProperty( "conf.center.zkserver" ); if (StringUtils.isBlank(zkAddress)) logger.error( "获取zk连接地址失败,请正确配置IP[集群地址以逗号隔开]" );
//重试策略:间隔时间为1s,重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry( 1000 , 10 ); client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy); //开启连接 client.start(); logger.info( "curator客户端连接ZK成功" );
@Override public void lock(String lockName) InterProcessMutex lock = new InterProcessMutex(client, root + "/" + lockName); try
lock.acquire(); lockThreadLocal.set(lock); logger.info( "线程:获取curator-zk锁:成功" , Thread.currentThread().getName(), lockName); catch (Exception e) logger.error( "线程:获取curator-zk锁:失败" , Thread.currentThread().getName(), lockName); unLock(lockName); throw new RuntimeException( "curator-zk锁:" + lockName + "添加失败" , e);
@Override public boolean tryLock(String lockName) return tryLock(lockName, 0 , TimeUnit.SECONDS);
@Override public boolean tryLock(String lockName, long time, TimeUnit unit) boolean isLock; InterProcessMutex lock = new InterProcessMutex(client, root + "/" + lockName); try
isLock = lock.acquire(time, unit); if (isLock) lockThreadLocal.set(lock); logger.info( "线程:获取curator-zk锁:成功" , Thread.currentThread().getName(), lockName);
catch (Exception e) logger.error( "线程:获取curator-zk锁:失败" , Thread.currentThread().getName(), lockName); unLock(lockName); throw new RuntimeException( "curator-zk锁:" + lockName + "添加失败" , e);
return isLock;
@Override public void unLock(String lockName) if (lockThreadLocal.get() != null ) try
lockThreadLocal.get().release(); logger.info( "线程:释放curator-zk锁:成功" , Thread.currentThread().getName(), lockName); catch (Exception e) logger.error( "线程:释放curator-zk锁:失败" , Thread.currentThread().getName(), lockName); throw new RuntimeException( "curator-zk锁:" + lockName + "释放失败" , e); finally
lockThreadLocal.remove();
|