基础管理Tair分布式锁
Posted 阿里云云栖号
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基础管理Tair分布式锁相关的知识,希望对你有一定的参考价值。
导语
在JVM中,怎么样防止因多线程并发造成的数据不一致或逻辑混乱?大家一定都能想到锁,能想到java.util.concurrent包下的各种各样的用法。
但在分布式环境下,不同的机器、不同的jvm,那些好用的并发锁工具,并不能满足需要。
所以我们需要有一个分布式锁,来解决分布式环境下的并发问题。而本文正是在这条道路上,走出的一些经验的总结。
我们按照待解决问题的场景,一步一步看下去。
问题一:如何实现一个分布式锁
锁,大多是基于一个只能被1个线程占用、得到的资源来实现的。
JVM的锁,是基于CPU对于寄存器的修改指令cmpxchg,来保证旧值改为新值的操作,只有一个能成功。这里的旧值,就是这个被争夺的资源,大多数情况,并发时第一个线程对旧值修改成功后,其他线程就没有机会了。(当然,ABA是另外一个话题,这里就不说了。。)
所以,在分布式环境下,要实现一个锁,我们就要找个一个只能被1个线程占用的资源。
有经验的开发很快能想到,共享磁盘文件、缓存、mysql数据库,这些分布式环境下,数据表现为单份的,都应当能满足需求。
然而,基于文件、DB会遭遇各式各样的问题,性能,经常也会是瓶颈。
因此,我们这里使用的,是淘系用的最多的缓存中间件产品--Tair。
查看com.taobao.tair.TairManager接口,发现有以下几个接口或许适合使用:
TairManager.incr
加法计数器。首次使用时,返回值能等于默认值,而不被+n
的机会,只有一个。貌似可以用。TairManager.lock
看起来名字像是这个意思,姑且拿来一试。TairManager.put
传入version版本进行校验,cas原则会保证只有一个能成功。貌似可以用。
(注:mdb有可能丢、且invalid时不保证跨机房一致性,所以这个锁肯定需要用ldb来实现的。)
在线上多机房情况下,做了一下测试,测试程序核心代码大约是:
void test(){ // 2个机房的jvm实例,每个实例n个线程同时执行本方法; while(true){ if(tryLock()){ // tryLock\unLock 的实现对应有3套; try{ logger.info("Got lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName()); Thread.sleep(1000); }finally { unLock(); logger.info("Release lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName()); } } } }
测试的结论如下:
TairManager.incr
初始几次锁的获取和释放没有问题,但是后来返回值很大,就谁也拿不到锁;怀疑和接口超时、或invalid机制有关;补充: 后面发现incr做锁的一种可靠方案是,使用值限定参数:
int lowBound
,int upperBound
比如:lowBound=0, upperBound=1,则value一直在0与1之间切换,用过多次,还是很靠谱的。TairManager.lock
完全不行,lock接口看来根本不是做这个用的;真正的使用场景是锁住一个key不容许更新
,不是锁机制。TairManager.put
非常稳定、靠谱;有个现象值得关注:A机房invalid之后,B机房会先拿到锁;因为invalid先从远程机房开始;
最后,给出ldb put
实现的分布式锁的核心代码(__后面都基于ldb put来实现__):
public boolean trylock(String key) { ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, 0); if (ResultCode.SUCCESS.equals(code)) return true; else return false; }public boolean unlock(String key) { ldbTairManager.invalid(NAMESPACE, key); }
问题二:lock之后,程序发布\或者进程crash,trylock就永远false了
每次发布,总发现有些异常数据,拿不到锁,不能继续向前走;
仔细分析,原来tair的lock,一直没能释放;
要解决这个问题,可以先不管原因,无脑的给tair put加上超时时间就行,这样业务至少可以自行恢复。
但是,这个超时时间需要仔细考虑把握,需要在业务承受范围之内。
注: 像程序发布、进程crash,这种情况,是无可避免的让锁没机会释放。还有其他可能性,大多是bug了。。
public boolean trylock(String key, int timeout) { ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, timeout); if (ResultCode.SUCCESS.equals(code)) return true; else return false; }
问题二:tair存在让人苦恼的超时问题,即使千分之1,本业务有时也不能容忍
tair的大神给的回复很简单:超时请重试;
仔细想一下,put超时分两种情况:
上一次put其实已经成功了;那么重试肯定会失败;
上一次put其实失败了;那么若这一次顺利,就会成功;
那如何解决上面的第1点呢?
其实,锁应该是能够经得起复查的(类似偏向锁):A拿到的锁,没有unlock之前,无论A重试检查多少次,都是A的!
怎么实现?
既然用的是ldb缓存,它是key-value结构的,前面version控制等,都只用到了key。
这里,我们可以从tair value里做文章:让value包含机器ip+线程name,trylock内先get value做检查
于是,实现变为:
public boolean trylock(String key, int timeout) { Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey); if (result == null) return null; if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is free ResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout); if (ResultCode.SUCCESS.equals(code)) return true; }else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){ return true; } return false; } private String getLockValue(){ return Utils.getHostname() + ":" + Thread.currentThread().getName(); }
注意: 其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,需复用锁时传入uuid。
public boolean trylock(String key, int timeout, String uuid){ ... }
问题三:新方案其实多了一次get操作,若是get也超时怎么办?
超时无法避免,还是要靠重试!(前提是逻辑可以重试)
public boolean trylock(String key, int timeout) { Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, key); if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())) // get timeout retry case result = locker.ldbTairManager.get(NAMESPACE, key); if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){ // 还是超时,则留下日志痕迹 logger.error("ldb tair get timeout. key:{}.",key); return false; } if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is free ResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout); if (ResultCode.SUCCESS.equals(code)) return true; else if(code==null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){ // put timeout retry case return trylock(key, timeout); // 递归尝试 } }else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){ return true; } return false; } // 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。 private String getLockValue(){ return Utils.getHostname() + ":" + Thread.currentThread().getName(); }
进一步的,我们还可以对get/put的retry做次数控制;
真实线上的情况,一般一次retry就能解决问题,次数多了,反而可能导致雪崩,需要慎重;
多一次Get的性能影响
有代码洁癖、性能洁癖的人可能会想:普通tair锁,一次put就能搞定,这里却要先get再put,浪费啊。。。
这里梳理一下:
若是
锁已经被持有
,那么get之后,麻烦发现被持有,直接返回失败;这时,并不会再次put,开销是一样的;(甚至get的开销,比put要小,至少不会占用put的限流阈值)若是
没人持有锁
,确实这时get有些浪费的,但是为了锁可以复查
这个特性(可重试)、为了能解决超时这个问题,我认为还是值得的。在实际场景中,开发者自己可以评估是否需要。比如:拿前面的uuid的样例API讲,若不需要这个特性时,就不传入uuid,那么实现代码里,可以自动降级为只有一个put的锁实现;
问题四:批量锁
批量锁,主要注意拿锁的顺序和释放锁相反,伪代码如下:
if(trylock("A") && trylock("B") && trylock("C")){ try{ // do something }finally{ // 注意这里的顺序要反过来 unlock("C"); unlock("B"); unlock("A"); } }
最后总结下,给一个完整代码 :smile
import com.taobao.tair.DataEntry;import com.taobao.tair.Result;import com.taobao.tair.ResultCode;import com.taobao.tair.TairManager;import org.apache.commons.lang.NotImplementedException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.slf4j.helpers.FormattingTuple;import org.slf4j.helpers.MessageFormatter;import javax.annotation.Resource;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock; public class CommonLocker { private static final Logger logger = LoggerFactory.getLogger(CommonLocker.class); @Resource private TairManager ldbTairManager; private static final short NAMESPACE = 1310; private static CommonLocker locker; public void init() { if (locker != null) return; synchronized (CommonLocker.class) { if (locker == null) locker = this; } } public static Lock newLock(String format, Object... argArray) { FormattingTuple ft = MessageFormatter.arrayFormat(format, argArray); return newLock(ft.getMessage()); } public static Lock newLock(String strKey) { String key = "_tl_" + strKey; return new TairLock(key, CommonConfig.lock_default_timeout); } public static Lock newLock(String strKey, int timeout) { String key = "_tl_" + strKey; return new TairLock(key, timeout); } private static class TairLock implements Lock { private String lockKey; private boolean gotLock = false; private int retryGet = 0; private int retryPut = 0; private int timeout; public TairLock(String key, int timeout) { this.lockKey = tokey(key); this.timeout = timeout; } public boolean tryLock() { return tryLock(timeout); } /** * need finally do unlock * * @return */ public boolean tryLock(int timeout) { Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey); while (retryGet++ < CommonConfig.lock_get_max_retry && (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) // 重试一次 result = locker.ldbTairManager.get(NAMESPACE, lockKey); if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // lock is free // 已验证version 2表示为空,若不是为空,则返回version error ResultCode code = locker.ldbTairManager.put(NAMESPACE, lockKey, locker.getValue(), 2, timeout); if (ResultCode.SUCCESS.equals(code)) { gotLock = true; return true; } else if (retryPut++ < CommonConfig.lock_put_max_retry && (code == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) { return tryLock(timeout); } } else if (result.getValue() != null && locker.getValue().equals(result.getValue().getValue())) { // 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。 // 若是自己的锁,自己继续用 gotLock = true; return true; } // 到这里表示没有拿到锁 return false; } public void unlock() { if (gotLock) { ResultCode invalidCode = locker.ldbTairManager.invalid(NAMESPACE, lockKey); gotLock = false; } } public void lock() { throw new NotImplementedException(); } public void lockInterruptibly() throws InterruptedException { throw new NotImplementedException(); } public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException { throw new NotImplementedException(); } public Condition newCondition() { throw new NotImplementedException(); } } // 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。 private String getValue() { return getHostname() + ":" + Thread.currentThread().getName(); } /** * 获得机器名 * * @return */ public static String getHostname() { try { return InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { return "[unknown]"; } } public void setLdbTairManager(TairManager ldbTairManager) { this.ldbTairManager = ldbTairManager; } }
使用样例
Lock lockA = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getMaster().getUid()); Lock lockB = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getPartnerList().get(0).getUid());try { if (lockA.tryLock() && lockB.tryLock()) {// 分布式锁定本任务 // do something.... } } finally { lockB.unlock(); lockA.unlock(); }
祝大家用的愉快,Happy Coding!!
大流量&高并发
互联网应用实践在线峰会
应对超大流量、超高并发的绝密实践,最适合技术开发者的夜间技术交流、每场1.5小时互动分享、素材第一时间公开、全开放注册,我们希望通过在线峰会这样的新模式,广聚开发者,共建技术分享的生态。
[ 09月21日 ] 阿里聚石塔电商云容器服务应用和实践
[ 09月21日 ] 如何打造应对超大流量的负载均衡
[ 09月21日 ] 聚星台:客户运营核心大数据与算法技术
扫码直入报名页:
以上是关于基础管理Tair分布式锁的主要内容,如果未能解决你的问题,请参考以下文章
好好的 Tair 排行榜不用,非得自己写?20 行代码实现高性能排行榜
好好的 Tair 排行榜不用,非得自己写?20 行代码实现高性能排行榜