hbase批量查询卡住

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hbase批量查询卡住相关的知识,希望对你有一定的参考价值。

hbase批量查询卡住是因为数据加载过多。
在进行测试的时候,使用了多线程进行并发数据的模拟产生,并使用了 HBase 的批量插入
把程序 jar 包放到服务器集群上运行的时候,刚开始还可以正常的入库,运行一两分钟之后就会报错
后来发现是数据入库消耗的时间大于提交数据的时间,就会造成数据阻塞
参考技术A 如果HBase批量查询卡住的话,可能是因为查询的rowkey的跨度比较大,建议你使用多线程或者多进程来实现并行查询来提升查询速度。

ImportTsv-HBase数据导入工具

一、概述

HBase官方提供了基于Mapreduce的批量数据导入工具:Bulk load和ImportTsv。关于Bulk load大家能够看下我还有一篇博文

通常HBase用户会使用HBase API导数,可是假设一次性导入大批量数据,可能占用大量Regionserver资源,影响存储在该Regionserver上其它表的查询,本文将会从源代码上解析ImportTsv数据导入工具。探究怎样高效导入数据到HBase。


二、ImportTsv介绍

ImportTsv是Hbase提供的一个命令行工具。能够将存储在HDFS上的自己定义分隔符(默认\t)的数据文件。通过一条命令方便的导入到HBase表中,对于大数据量导入很有用,当中包括两种方式将数据导入到HBase表中:

第一种是使用TableOutputformat在reduce中插入数据;

另外一种是先生成HFile格式的文件,再运行一个叫做CompleteBulkLoad的命令,将文件move到HBase表空间文件夹下。同一时候提供给client查询。


三、源代码解析

本文基于CDH5 HBase0.98.1,ImportTsv的入口类是org.apache.hadoop.hbase.mapreduce.ImportTsv

String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
if (hfileOutPath != null) {
  if (!admin.tableExists(tableName)) {
    LOG.warn(format("Table ‘%s‘ does not exist.", tableName));
    // TODO: this is backwards. Instead of depending on the existence of a table,
    // create a sane splits file for HFileOutputFormat based on data sampling.
    createTable(admin, tableName, columns);
  }
  HTable table = new HTable(conf, tableName);
  job.setReducerClass(PutSortReducer.class);
  Path outputDir = new Path(hfileOutPath);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  if (mapperClass.equals(TsvImporterTextMapper.class)) {
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(TextSortReducer.class);
  } else {
    job.setMapOutputValueClass(Put.class);
    job.setCombinerClass(PutCombiner.class);
  }
  HFileOutputFormat.configureIncrementalLoad(job, table);
} else {
  if (mapperClass.equals(TsvImporterTextMapper.class)) {
    usage(TsvImporterTextMapper.class.toString()
        + " should not be used for non bulkloading case. use "
        + TsvImporterMapper.class.toString()
        + " or custom mapper whose value type is Put.");
    System.exit(-1);
  }
  // No reducers. Just write straight to table. Call initTableReducerJob
  // to set up the TableOutputFormat.
  TableMapReduceUtil.initTableReducerJob(tableName, null, job);
  job.setNumReduceTasks(0);
}

从ImportTsv.createSubmittableJob方法中推断參数BULK_OUTPUT_CONF_KEY開始。这步直接影响ImportTsv的Mapreduce作业终于以哪种方式入HBase库

假设不为空而且用户没有自己定义Mapper实现类(參数importtsv.mapper.class)时,则使用PutSortReducer,当中会对Put排序,假设每行记录有非常多column,则会占用Reducer大量的内存资源进行排序。

Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setOutputFormatClass(TableOutputFormat.class);
假设为空,调用TableMapReduceUtil.initTableReducerJob初始化TableOutputformat的Reducer输出。此方式不须要使用Reducer,由于直接在mapper的Outputformat中会批量的调用Put API将数据提交到Regionserver上(相当于并行的运行HBase Put API)


四、实战

1、使用TableOutputformat的Put API上传数据,非bulk-loading

$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <hdfs-inputdir>
2、使用bulk-loading生成StoreFiles(HFile)

step1、生成Hfile

$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir <tablename> <hdfs-data-inputdir>
step2、完毕导入

$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>

五、总结

在使用ImportTsv时。一定要注意參数importtsv.bulk.output的配置,通常来说使用Bulk output的方式对Regionserver来说更加友好一些,这样的方式载入数据差点儿不占用Regionserver的计算资源。由于仅仅是在HDFS上移动了HFile文件,然后通知HMaster将该Regionserver的一个或多个region上线。







以上是关于hbase批量查询卡住的主要内容,如果未能解决你的问题,请参考以下文章

Hbase笔记:批量导入

mongodb数据库批量插入海量数据时为啥有少部分数据丢失

SQL:基于另一个的MIN批量更新表?可能很容易,但卡住了

KUDU 秒级查询的数据仓库

ImportTsv-HBase数据导入工具

海量数据查询关系型数据库存储大数据,要点就是:简单存储分区分表高效索引批量写入