hbase(0.94) getscan源码分析

Posted dsj2016

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hbase(0.94) getscan源码分析相关的知识,希望对你有一定的参考价值。

简介

本文是需要用到hbase timestamp性质时研究源码所写.内容有一定侧重.且个人理解不算深入,如有错误请不吝指出.

如何看源码

hbase依赖很重,没有独立的client包.所以目前如果在maven中指定如下:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase</artifactId>
    <version>0.94-adh3u9.9</version>
    <exclusions>
        <exclusion>
            <groupId>org.jruby</groupId>
            <artifactId>jruby-complete</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

可以看到其会把整个hbase的源码都下载下来.这一点在查看源码上是比较方便的.

入口

本文以get为例.代码的入口位于org.apache.hadoop.hbase.client.HTable的get方法:

  public Result get(final Get get) throws IOException {
    try {
    startTrace(get);
    ServerCallable<Result> serverCallable = new ServerCallable<Result>(
        connection, tableName, get.getRow(), operationTimeout) {
      public Result call() throws IOException {
        // rpc调用服务端进行查询
        return server.get(location.getRegionInfo().getRegionName(), get);
      }
    };
    return executeServerCallable(serverCallable);
    } finally {
      endTrace(TableOperationMetricType.GET, 1);
    }
  }

其中在实际执行server.get时,会通过反射调用一个rpc接口真正和服务器进行沟通.所以如果把断点打在这个函数的里面,会发现无法断住.

路由

这里其实有一个很重要的问题.就是在执行get的时候.用户只会传入rowkey等信息,这里hbase是如何根据rowkey确认该数据所在region的.由上述代码可见location.getRegionInfo().getRegionName()即获取到了regionname.这一块的细节逻辑未深入研究.

regionserver逻辑

  public Result get(byte[] regionName, Get get) throws IOException {
    checkOpen();
    final long startTime = System.nanoTime();
    ReadMetricsData metricsData = null;
    try {
      // 确认当前请求真正的region
      HRegion region = getRegion(regionName);
      checkReadEnabled(region.getTableDesc().getNameAsString());
      HBaseServer.setRegionInfoForCurCall(region.getRegionInfo()
          .getTableNameAsString(), region.getRegionInfo().getEncodedName());
      if (!region.getRegionInfo().isMetaTable()) {
        metricsData = new ReadMetricsData();
        ReadMetricsData.setCurReadMetricsData(metricsData);
      }
      // 真正执行查询,第二个参数是一个递增的整数.用于实现mvcc,即多版本并发控制
      // 简单描述就是查询时会依靠这个数字来确定读取的数据版本,避免出现读取put
      // 多个列、列族时,get到其插入一半的数据.
      Result r = region.get(get, getLockFromId(get.getLockId()));
      if (get.isSupportEncodingResult()) {
        r.setKVEncoding(KVEncoding.FASTPREFIX);
      }
      int dataLen = (int) r.getWritableSize();
      long took = System.nanoTime() - startTime;
      this.addReadMetricsCount(region, dataLen,
          HBaseServer.getRemoteAddress(), 1, (int) (took / MS_CONVERTTO_NS));
      if (metricsData != null) {
        this.metrics.getLatencies.updateReadMetricsData(metricsData, took);
      }
      return r;
    } catch (Throwable t) {
      this.metrics.failedReadRequests.inc();
      throw convertThrowableToIOE(cleanup(t));
    } finally {
      ReadMetricsData.setCurReadMetricsData(null);
    }
  }

region逻辑

上文代码最终会调用:HRegion.get(Get get, boolean withCoprocessor)方法.

  private List<KeyValue> get(Get get, boolean withCoprocessor)
  throws IOException {
    long now = EnvironmentEdgeManager.currentTimeMillis();

    List<KeyValue> results = new ArrayList<KeyValue>();

    // pre-get CP hook
    // Coprocessor 为hbase中的协处理器概念.其可由clinet发往hbaseserver,在执行put、get前后
    // 执行一些逻辑.如进行简单运算、筛选.
    if (withCoprocessor && (coprocessors != null)) {
       if (coprocessors.preGet(get, results)) {
         return results;
       }
    }

    // 注意这里.所有的get都会转化为一个scan
    Scan scan = new Scan(get);

    RegionScanner scanner = null;
    try {
      // 构造scanner
      scanner = getScanner(scan);
      // 从scanner中取出数据放入results
      scanner.next(results);
    } finally {
      if (scanner != null)
        scanner.close();
    }

    // post-get CP hook
    // 协处理器的后置处理调用
    if (withCoprocessor && (coprocessors != null)) {
      coprocessors.postGet(get, results);
    }

    // do after lock
    final long after = EnvironmentEdgeManager.currentTimeMillis();
    this.opMetrics.updateGetMetrics(get.familySet(), after - now);
    
    return results;
  }

对于Scan scan = new Scan(get);,在scan的构造方法中可见:

  public Scan(Get get) {
    this.startRow = get.getRow();
    this.stopRow = get.getRow();
    this.filter = get.getFilter();
    this.cacheBlocks = get.getCacheBlocks();
    this.maxVersions = get.getMaxVersions();
    this.tr = get.getTimeRange();
    this.familyMap = get.getFamilyMap();
  }

也就是对于一个get.实际上是把其当做一个scan进行查询的.所以这里可以推断出.一个get和一个startrow、stoprow均相同的scan,在执行效率上是不会有差异的.

其次是关于timerange这一项.hbase中一个get、scan.可以设置timerange或者timestamp.其中timerange是指只查询某个时间范围的数据.而timestamp是指只查询某个时间点的数据.

而如果不设置,则会默认查询所有数据.这一块的逻辑实现就在get、scan的构造方法和set方法中.

  • 如果用户没有setTimeStamp、setTimeRange

    // get、scan均会调用默认无参数构造方法构造其tr.
    private TimeRange tr = new TimeRange();
  • 如果用户进行了设置

      public Get setTimeRange(long minStamp, long maxStamp)
      throws IOException {
        tr = new TimeRange(minStamp, maxStamp);
        return this;
      }
    
      public Get setTimeStamp(long timestamp) {
        try {
          tr = new TimeRange(timestamp, timestamp+1);
        } catch(IOException e) {
          // Will never happen
        }
        return this;
      }

可见timestamp是一种特殊的timerange,其构造方法为[timestamp,timestamp+1)的range.

scanner存在的意义

hbase的scanner并非为了使用设计模式而强行加入一个scanner做数据查询.这里的scanner的必要性主要在于其逻辑、物理存储特性.这里简单描述就是,在hbase中一个region是由多个store的.每个store才是真正的存储的逻辑最小单元.而一个store里面又有一个memstore(内存),零个或多个storefile(一般是位于HDFS的HFile文件).

由此,有个很明显的问题:在用户的一次查询中,用户的输入是一个rowkey,而这个rowkey的不同列族的数据,可能在不同的store中.而即使确定了一个store,可能数据在memstore中(尚未flush到硬盘),也有可能已经在storefile中了.

进一步也就需要一个机制从这些不同的逻辑、物理的存储媒介中遍历查询数据并且做一个合并.

scanner 的构造

暂无

scanner的遍历逻辑

上文中的scanner.next(results);最终会执行到HRegion.nextInternal(int limit)方法.

代码:

    private boolean nextInternal(int limit) throws IOException {
      RpcCallContext rpcCall = HBaseServer.getCurrentCall();
      while (true) {
        if (rpcCall != null) {
          // If a user specifies a too-restrictive or too-slow scanner, the
          // client might time out and disconnect while the server side
          // is still processing the request. We should abort aggressively
          // in that case.
          rpcCall.throwExceptionIfCallerDisconnected();
        }

        KeyValue current = this.storeHeap.peek();
        byte[] currentRow = null;
        int offset = 0;
        short length = 0;
        if (current != null) {
          currentRow = current.getBuffer();
          offset = current.getRowOffset();
          length = current.getRowLength();
        }
        if (isStopRow(currentRow, offset, length)) {
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results);
          }
          if (filter != null && filter.filterRow()) {
            results.clear();
          }

          return false;
        } else if (filterRowKey(currentRow, offset, length)) {
          nextRow(currentRow, offset, length);
        } else {
          KeyValue nextKv;
          do {
            this.storeHeap.next(results, limit - results.size());
            if (limit > 0 && results.size() == limit) {
              if (this.filter != null && filter.hasFilterRow()) {
                throw new IncompatibleFilterException(
                  "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
              }
              return true; // we are expecting more yes, but also limited to how many we can return.
            }
            nextKv = this.storeHeap.peek();
          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));

          final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());

          // now that we have an entire row, lets process with a filters:

          // first filter with the filterRow(List)
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results);
          }

          if (results.isEmpty() || filterRow()) {
            // this seems like a redundant step - we already consumed the row
            // there‘re no left overs.
            // the reasons for calling this method are:
            // 1. reset the filters.
            // 2. provide a hook to fast forward the row (used by subclasses)
            nextRow(currentRow, offset, length);

            // This row was totally filtered out, if this is NOT the last row,
            // we should continue on.

            if (!stopRow) continue;
          } else if (this.remainingOffset > 0) {
            this.remainingOffset--;
            nextRow(currentRow, offset, length);
            if (!stopRow) continue;
          }
          return !stopRow;
        }
      }
    }

简化版:

    private boolean nextInternal(int limit) throws IOException {
      while (true) {
        // 获取heap顶的KeyValue
        KeyValue current = this.storeHeap.peek();
        byte[] currentRow = null;
        int offset = 0;
        short length = 0;
        if (current != null) {
          currentRow = current.getBuffer();
          offset = current.getRowOffset();
          length = current.getRowLength();
        }
        if (isStopRow(currentRow, offset, length)) {
          // 如果是结束行
          return false;
        } else if (filterRowKey(currentRow, offset, length)) {
          // 如果filter过滤掉了.则直接看下一行数据
          nextRow(currentRow, offset, length);
        } else {
          KeyValue nextKv;
          do {
            // 核心代码
            this.storeHeap.next(results, limit - results.size());
            nextKv = this.storeHeap.peek();
          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
        }
      }
    }

这里省去了边缘控制、过滤逻辑等内容.主要关注其核心逻辑.

全部代码中最核心的有两行:KeyValue current = this.storeHeap.peek();this.storeHeap.next(results, limit - results.size());.

这里的storeHeap是hbase维护的一个二叉堆(优先队列).这个堆里面存储的元素,是scanner.每个scanner都会持有当前scanner目前最新的、尚未返回的keyvalue.这个二叉堆的排序方式就是根据每个scanner当前keyvalue的rowkey进行排序.

每次执行查询的时候.首先会用KeyValue current = this.storeHeap.peek();取出堆顶的scanner的当前keyvalue.进行一些逻辑判断(主要是判断rowkey,如判断是否超过limit、是否到了stoprow、是否被filter过滤等).如果该keyvalue全部通过,也就是认为其应该被本次查询查到.会执行一次this.storeHeap.next(results, limit - results.size());注意这里才是有可能把当前keyvalue放入查询结果的地方.(不一定会放入,next方法中还有针对value的判断逻辑,比如比较timestamp是否正确).

scanner的排序

上文提到了scanner会依靠其当前元素rowkey进行排序.可以在类KeyValueHeap的构造方法中看到端倪.

  KeyValueHeap(List<? extends KeyValueScanner> scanners,
      KVScannerComparator comparator) throws IOException {
    this.comparator = comparator;
    if (!scanners.isEmpty()) {
      this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
          this.comparator);
      for (KeyValueScanner scanner : scanners) {
        if (scanner.peek() != null) {
          this.heap.add(scanner);
        } else {
          scanner.close();
        }
      }
      this.current = pollRealKV();
    }
    
  }

毫无疑问.从scanner中取出元素也会影响KeyValueHeap中二叉堆的排序.故其可以保证二叉堆的堆顶的scanner的当前keyvalue一直是离上个遍历到的rowkey最近的keyvalue.

二叉堆的next逻辑

上文中的this.storeHeap.next(results, limit - results.size());最终会执行到:StoreScanner.next(List<KeyValue> outResult, int limit).其源码很长:

  public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {

    if (checkReseek()) {
      return true;
    }

    // if the heap was left null, then the scanners had previously run out anyways, close and
    // return.
    if (this.heap == null) {
      close();
      return false;
    }

    KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }

    // only call setRow if the row changes; avoids confusing the query matcher
    // if scanning intra-row
    byte[] row = peeked.getBuffer();
    int offset = peeked.getRowOffset();
    short length = peeked.getRowLength();
    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
      matcher.setRow(row, offset, length);
    }

    KeyValue kv;
    KeyValue prevKV = null;
    List<KeyValue> results = new ArrayList<KeyValue>();

    // Only do a sanity-check if store and comparator are available.
    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    long cumulativeMetric = 0;
    try {
      LOOP: while ((kv = this.heap.peek()) != null) {
        // Check that the heap gives us KVs in an increasing order.
        checkScanOrder(prevKV, kv, comparator);
        prevKV = kv;
        ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        switch (qcode) {
        case INCLUDE:
        case INCLUDE_AND_SEEK_NEXT_ROW:
        case INCLUDE_AND_SEEK_NEXT_COL:

          Filter f = matcher.getFilter();
          results.add(f == null ? kv : f.transform(kv));

          if (limit > 0 && results.size() == limit) {
            if (this.mustIncludeColumn != null) {
                throw new DoNotRetryIOException("Assistant " + this.assistant
                    + " incompatible with scan where limit=" + limit);
            }
          }

          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
            if (!matcher.moreRowsMayExistAfter(kv)) {
              filterRowIfMissingMustIncludeColumn(results);
              outResult.addAll(results);
              return false;
            }
            seekToNextRow(kv);
          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
            reseek(matcher.getKeyForNextColumn(kv));
          } else {
            this.heap.next();
          }

          cumulativeMetric += kv.getLength();
          if (limit > 0 && (results.size() == limit)) {
            break LOOP;
          }
          continue;

        case DONE:
          filterRowIfMissingMustIncludeColumn(results);
          // copy jazz
          outResult.addAll(results);
          return true;

        case DONE_SCAN:
          close();
          filterRowIfMissingMustIncludeColumn(results);
          // copy jazz
          outResult.addAll(results);

          return false;

        case SEEK_NEXT_ROW:
          // This is just a relatively simple end of scan fix, to
          // short-cut end
          // us if there is an endKey in the scan.
          if (!matcher.moreRowsMayExistAfter(kv)) {
            filterRowIfMissingMustIncludeColumn(results);
            outResult.addAll(results);
            return false;
          }

          seekToNextRow(kv);
          break;

        case SEEK_NEXT_COL:
          reseek(matcher.getKeyForNextColumn(kv));
          break;

        case SKIP:
          this.heap.next();
          break;

        case SEEK_NEXT_USING_HINT:
          KeyValue nextKV = matcher.getNextKeyHint(kv);
          if (nextKV != null) {
            reseek(nextKV);
          } else {
            heap.next();
          }
          break;

        default:
          throw new RuntimeException("UNEXPECTED");
        }
      }
    } finally {
      RegionMetricsStorage.incrNumericMetric(metricNameGetSize,
          cumulativeMetric);
    }

    if (!results.isEmpty()) {
      filterRowIfMissingMustIncludeColumn(results);
      // copy jazz
      outResult.addAll(results);
      return true;
    }

    // No more keys
    close();
    return false;
  }

简化版:

  public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {

    KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }

    byte[] row = peeked.getBuffer();
    int offset = peeked.getRowOffset();
    short length = peeked.getRowLength();
    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
      matcher.setRow(row, offset, length);
    }

    KeyValue kv;
    List<KeyValue> results = new ArrayList<KeyValue>();

    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    long cumulativeMetric = 0;
    try {
      // 获取本store当前的keyvalue
      LOOP: while ((kv = this.heap.peek()) != null) {
        // 执行匹配.这里的匹配规则部分从用户的filter中来
        ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        switch (qcode) {
        case INCLUDE:
        case INCLUDE_AND_SEEK_NEXT_ROW:
        case INCLUDE_AND_SEEK_NEXT_COL:
          // 针对当前keyvalue应该包含在用户的查询结果的情况
          Filter f = matcher.getFilter();
          results.add(f == null ? kv : f.transform(kv));

          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
            if (!matcher.moreRowsMayExistAfter(kv)) {
              filterRowIfMissingMustIncludeColumn(results);
              outResult.addAll(results);
              return false;
            }
            seekToNextRow(kv);
          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
            reseek(matcher.getKeyForNextColumn(kv));
          } else {
            this.heap.next();
          }

          cumulativeMetric += kv.getLength();
          if (limit > 0 && (results.size() == limit)) {
            // 查到了limit的数量.跳出循环
            break LOOP;
          }
          continue;

        case DONE:
        case DONE_SCAN:
        case SEEK_NEXT_ROW:
        case SEEK_NEXT_COL:
        case SKIP:
        case SEEK_NEXT_USING_HINT:
        // 这些都是查询过程中的各种情况.会有针对的处理
                
        default:
          throw new RuntimeException("UNEXPECTED");
        }
      }
    } finally {
      RegionMetricsStorage.incrNumericMetric(metricNameGetSize,
          cumulativeMetric);
    }

    // 放入结果中  
    if (!results.isEmpty()) {
      filterRowIfMissingMustIncludeColumn(results);
      outResult.addAll(results);
      return true;
    }
    close();
    return false;
  }

上述代码可以看到整个next的核心逻辑.其实就是把当前store的当前keyvalue取出.用一个matcher做比较.看看该数据是否应该是用户查询的结果.

查询到的数据的多种可能

上文中的枚举值.这里举个例子.比如查询到一行数据的第一列是应该被查询到的.按理就应该查询其列族的第二列.而如果发现该行数据被删除掉了.那么不用查询其第二列了.也不用查询下个store了.应该直接查询下一行数据.

以上是关于hbase(0.94) getscan源码分析的主要内容,如果未能解决你的问题,请参考以下文章

hbase-0.94 Java API

将一张表的数据从 HBase 0.94 复制到 HBase 0.98

软件Nutch2.3 + HBase 0.94 + Solr 搭建网络数据采集器

HBase ProcedureV2 分析

HBase源码|源码分析怎么做?

Hbase put写入源码分析