HBase mapreduce 作业 - 多次扫描 - 如何设置每次扫描的表

Posted

技术标签:

【中文标题】HBase mapreduce 作业 - 多次扫描 - 如何设置每次扫描的表【英文标题】:HBase mapreduce job - Multiple scans - How to set the table of each Scan 【发布时间】:2017-03-15 10:20:34 【问题描述】:

我使用 HBase 1.2。我想使用多次扫描在 HBase 上运行 MapReduce 作业。在 API 中,有: TableMapReduceUtil.initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, org.apache.hadoop.mapreduce.Job job).

但是如何指定每次扫描的表呢?我使用下面的代码:

List<Scan> scans = new ArrayList<>();
for (String firstPart : firstParts) 
    Scan scan = new Scan();
    scan.setRowPrefixFilter(Bytes.toBytes(firstPart));
    scan.setCaching(500);
    scan.setCacheBlocks(false);
    scans.add(scan);

TableMapReduceUtil.initTableMapperJob(scans, MyMapper.class, Text.class, Text.class, job);

它给出了以下异常

Exception in thread "main" java.lang.NullPointerException
        at org.apache.hadoop.hbase.TableName.valueOf(TableName.java:436)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:184)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:241)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:240)
        at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:115)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:305)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:322)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1714)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325)

我认为这很正常,因为在任何地方都没有指定应该应用每次扫描的表。

但是怎么做呢?

我尝试添加

scan.setAttribute("scan.attributes.table.name", Bytes.toBytes("my_table"));

但它给出了同样的错误

【问题讨论】:

【参考方案1】:

来自文档https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.html

List<Scan> scans = new ArrayList<Scan>();

 Scan scan1 = new Scan();
 scan1.setStartRow(firstRow1);
 scan1.setStopRow(lastRow1);
 scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
 scans.add(scan1);

 Scan scan2 = new Scan();
 scan2.setStartRow(firstRow2);
 scan2.setStopRow(lastRow2);
 scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
 scans.add(scan2);

 TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
     IntWritable.class, job);

你的情况:

使用Scan.SCAN_ATTRIBUTES_TABLE_NAME 因为你没有在扫描实例级别设置表,所以你得到了这个 NPE...

请按照这个例子,你必须在你的 for 循环中而不是在外部设置表名......然后它应该可以工作

List<Scan> scans = new ArrayList<Scan>();

  for(int i=0; i<3; i++)
    Scan scan = new Scan();

    scan.addFamily(INPUT_FAMILY);
    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME ));

    if (start != null) 
      scan.setStartRow(Bytes.toBytes(start));
    
    if (stop != null) 
      scan.setStopRow(Bytes.toBytes(stop));
    

    scans.add(scan);

    LOG.info("scan before: " + scan);

【讨论】:

以上是关于HBase mapreduce 作业 - 多次扫描 - 如何设置每次扫描的表的主要内容,如果未能解决你的问题,请参考以下文章

运行一百万次扫描的 hbase mapreduce 作业是不是有意义?

mapreduce、hbase 和扫描

原生 mapreduce VS hbase mapreduce

针对不同映射器的 HBase MapReduce 拆分扫描

使用 HBase 表作为 MapReduce 源

如何从 Result 对象中获取 HBase 表名作为 mapreduce 参数?