HBase行锁原理及实现
Posted lipeng_bigdata
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase行锁原理及实现相关的知识,希望对你有一定的参考价值。
请带着如下问题阅读本文。
1、什么是行锁?
2、HBase行锁的原理是什么?
3、HBase行锁是如何实现的?
4、HBase行锁是如何应用的?
一、什么是行锁?
我们知道,数据库中存在事务的概念。事务是作为单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全的不执行。而事务的四大特点即原子性、一致性、分离性和持久性。其中,原子性首当其冲,那么在HBase内部实现其原子性的重要保证是什么呢?答案就是行锁。
什么是行锁呢?顾名思义,它就是加在行上的一把锁。在它未释放该行前,最起码其他访问者是无法对该行做修改的,即要修改的话,必须得获得该行的锁才能拥有修改改行数据的权限,这就是行锁的含义。
二、HBase行锁实现原理
HBase行锁是利用Java并发包concurrent里的CountDownLatch(1)来实现的。它的主要思想就是在服务器端每个访问者单独一个数据处理线程,每个处理线程针对特定行数据修改时必须获得该行的行锁,而其他客户端线程想要修改数据的话,必须等待前面的线程释放锁后才被允许,这就利用了Java并发包中的CountDownLatch,CountDownLatch为Java中的一个同步辅助类,在完成一组正在其他线程中进行的操作之前,它允许一个或多个线程一直等待。这里,将线程数设置为1,十分巧妙的实现了独占锁的概念。
三、HBase行锁的实现
HBase的行锁主要是通过HRegion的两个内部类实现的,其中一个是RowLock,另外一个是RowLockContext。
我们首先看RowLock这个类,其定义如下:
/**
* Row lock held by a given thread.
* One thread may acquire multiple locks on the same row simultaneously.
* The locks must be released by calling release() from the same thread.
*
* 给定线程持有的行锁。
* 一个线程可以同时获得同一行上的多个锁。
* 锁必须被相同线程,通过调用release()释放。
*/
public static class RowLock
// 行锁上下文,持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容
@VisibleForTesting final RowLockContext context;
// 行锁是否被释放
private boolean released = false;
// 构造函数
@VisibleForTesting RowLock(RowLockContext context)
this.context = context;
/**
* Release the given lock. If there are no remaining locks held by the current thread
* then unlock the row and allow other threads to acquire the lock.
* 释放给定的锁。如果当前线程不再持有任何锁,那么对该行解锁并允许其他线程获得锁。
* @throws IllegalArgumentException if called by a different thread than the lock owning thread
*/
public void release()
if (!released)
context.releaseLock();
released = true;
通过上述源码我们可以看到,行锁RowLock有两个成员变量,RowLockContext类型的行锁上下文context和布尔类型的行锁是否释放released。其中,行锁上下文context持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容,并且,利用java的concurrent并发包里的CountDownLatch(1)实现了线程对对象的独占锁。
RowLockContext的源码如下:
// 行锁上下文,包括指定的行row,同步计数器latch,锁的数目lockCount和线程thread
@VisibleForTesting class RowLockContext
private final HashedBytes row;// 行
private final CountDownLatch latch = new CountDownLatch(1);//
private final Thread thread;
private int lockCount = 0;
// 构造方法
RowLockContext(HashedBytes row)
this.row = row;
this.thread = Thread.currentThread();
// 判断是否为当前线程对应的行锁上下文
boolean ownedByCurrentThread()
return thread == Thread.currentThread();
RowLock newLock()
lockCount++;
return new RowLock(this);
void releaseLock()
if (!ownedByCurrentThread())
throw new IllegalArgumentException("Lock held by thread: " + thread
+ " cannot be released by different thread: " + Thread.currentThread());
lockCount--;
if (lockCount == 0)
// no remaining locks by the thread, unlock and allow other threads to access
RowLockContext existingContext = lockedRows.remove(row);
if (existingContext != this)
throw new RuntimeException(
"Internal row lock state inconsistent, should not happen, row: " + row);
// 同步计数器减1
latch.countDown();
通过源码我们可以看到,行锁的上下文信息,主要包括行锁对应的行row以及占用该行锁的线程thread。构造RowContext时,只需传入行row即可,占用的线程则通过Thread.currentThread()获得当前线程。
新加锁时,通过调用newLock()方法即可实现,首先锁的计数器lockCount加1,然后返回由当前RowContext构造RowLock实例即可。
释放锁时,通过调用releaseLock()方法即可实现,首先通过ownedByCurrentThread()方法确保调用releaseLock()方法的当前线程是否和RowContext持有的线程一致,然后,锁的计数器lockCount减1,并且,如果lockCount为0的话,说明不再有操作占用该行,将row对应的行锁从数据结构lockedRows中删除,允许其他线程获得该行的行锁,最后,最重要的一步,latch.countDown(),就可完成行锁的释放了。
四、HBase行锁的使用
下面,我们看下HBase行锁的使用。在涉及数据变更的操作,比如Put、Delete等中,在对一行数据操作之前,都会调用getRowLockInternal()方法,获得该行数据的行锁。代码如下:
/**
* A version of getRowLock(byte[], boolean) to use when a region operation has already been
* started (the calling thread has already acquired the region-close-guard lock).
*/
protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException
// 构造HashedBytes类型表示的行,rowKey
HashedBytes rowKey = new HashedBytes(row);
// 创建行锁上下文实例,并制定为行rowkey和当前线程拥有
RowLockContext rowLockContext = new RowLockContext(rowKey);
// loop until we acquire the row lock (unless !waitForLock)
while (true)
// 将rowkey与行锁上下文的对应关系添加到Region的数据结构lockedRows中
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
if (existingContext == null)
// Row is not already locked by any thread, use newly created context.
// 该行已经没有被任何线程锁住,使用这个新创建的上下文
break;
else if (existingContext.ownedByCurrentThread())
// Row is already locked by current thread, reuse existing context instead.
// 该行已经被当前线程锁住,复用当前线程之前创建的行锁上下文实例
rowLockContext = existingContext;
break;
else
// 该行被其他线程锁住,如果不需要等待锁,直接返回null
if (!waitForLock)
return null;
TraceScope traceScope = null;
try
if (Trace.isTracing())
traceScope = Trace.startSpan("HRegion.getRowLockInternal");
// Row is already locked by some other thread, give up or wait for it
// 行已经被其他线程锁住,放弃或者等待
// 等待rowLockWaitDuration时间后,如果还未获得行锁,直接抛出异常
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS))
if(traceScope != null)
traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
throw new IOException("Timed out waiting for lock for row: " + rowKey);
if (traceScope != null) traceScope.close();
traceScope = null;
catch (InterruptedException ie)
LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
finally
if (traceScope != null) traceScope.close();
// allocate new lock for this thread
return rowLockContext.newLock();
具体流程整理如下:
1、利用byte[]类型的入参row构造HashedBytes类型表示的行,即rowKey;
2、利用rowKey创建行锁上下文实例,并指定为行rowKey和当前线程拥有;
3、循环:
3.1、将rowKey与行锁上下文的对应关系添加到Region的数据结构lockedRows中,可能出现以下几种情况:
3.1.1、如果lockedRows中之前不存在对应行rowKey的数据,说明该行当前没有被任何线程锁住,使用这个新创建的上下文rowLockContext,跳出循环并返回,说 明当前行可用;
3.1.2、如果该行已经被当前线程锁住,复用当前线程之前创建的行锁上下文实例,并赋值给rowLockContext,跳出循环并返回,说明当前行可用;
3.1.3、如果该行被其他线程锁住,如果入参确定不需要等待锁的获取,直接返回null,否则重复循环,直到等待rowLockWaitDuration时间后,如果还未获得行锁, 直接抛出异常。
至于都是哪些地方需要获取行锁,在以后各种数据读写流程中再做分析吧~
以上是关于HBase行锁原理及实现的主要内容,如果未能解决你的问题,请参考以下文章