Mapreduce 作业到 HBase 抛出 IOException: Pass a Delete or a Put

Posted

技术标签:

【中文标题】Mapreduce 作业到 HBase 抛出 IOException: Pass a Delete or a Put【英文标题】:Mapreduce job to HBase throws IOException: Pass a Delete or a Put 【发布时间】:2015-02-16 00:07:38 【问题描述】:

我正在尝试直接从我的 Mapper 输出到 HBase 表,同时在 EMR 上使用带有 HBase0.94.18 的 Hadoop2.4.0。

在执行下面的代码时,我得到了一个讨厌的IOException: Pass a Delete or a Put

public class TestHBase 
  static class ImportMapper 
            extends Mapper<MyKey, MyValue, ImmutableBytesWritable, Writable> 
    private byte[] family = Bytes.toBytes("f");

    @Override
    public void map(MyKey key, MyValue value, Context context) 
      MyItem item = //do some stuff with key/value and create item
      byte[] rowKey = Bytes.toBytes(item.getKey());
      Put put = new Put(rowKey);
      for (String attr : Arrays.asList("a1", "a2", "a3")) 
        byte[] qualifier = Bytes.toBytes(attr);
        put.add(family, qualifier, Bytes.toBytes(item.get(attr)));
      
      context.write(new ImmutableBytesWritable(rowKey), put);
    
  

  public static void main(String[] args) throws Exception 
    Configuration conf = HBaseConfiguration.create();
    String input = args[0];
    String table = "table";
    Job job = Job.getInstance(conf, "stuff");

    job.setJarByClass(ImportMapper.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    FileInputFormat.setInputDirRecursive(job, true);
    FileInputFormat.addInputPath(job, new Path(input));

    TableMapReduceUtil.initTableReducerJob(
            table,                  // output table
            null,                   // reducer class
            job);
    job.setNumReduceTasks(0);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  

有谁知道我做错了什么?

堆栈跟踪

错误:java.io.IOException:在 org.apache.hadoop.hbase.mapreduce 的 org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) 处传递 Delete 或 Put .TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:646) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl .java:89) 在 org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) 在 org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) 在org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) 在 org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775) 在 org.apache.hadoop.mapred.MapTask.run( MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject .java:415) 在 org.apache.hadoop.security .UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) 容器被 ApplicationMaster 杀死。容器应要求被杀死。退出代码为 143 容器以非零退出代码 143 退出

【问题讨论】:

让我们看看堆栈跟踪 在您提供的代码中:context.write(new ImmutableBytesWritable(rowKey), put); 它在 map 方法之外。请先修复它,因为它与回溯显示的内容不匹配... 感谢您指出 Ruben,这是复制/粘贴错误 【参考方案1】:

如果你能显示完整的堆栈跟踪会更好,这样我就可以帮助你轻松解决它。我没有执行你的代码。据我看到你的代码,这可能是问题job.setNumReduceTasks(0);

Mapper 将期待您的 put 对象直接写入 Apache HBase。 你可以增加setNumReduceTasks或者如果你看到API你可以找到它的默认值并评论它。

【讨论】:

添加了堆栈跟踪【参考方案2】:

感谢您添加堆栈跟踪。不幸的是,您没有包含引发异常的代码,因此我无法为您完全跟踪它。相反,我做了一些搜索,为你发现了一些东西。

您的堆栈跟踪类似于此处另一个 SO 问题中的堆栈跟踪: Pass a Delete or a Put error in hbase mapreduce

那个人通过注释掉job.setNumReduceTasks(0);解决了这个问题

有一个类似的 SO question 有相同的异常,但无法以这种方式解决问题。相反,它遇到了注释问题:

"java.io.IOException: Pass a Delete or a Put" when reading HDFS and storing HBase


Here 是一些很好的例子,说明了如何使用 setNumReduceTasks 为 0 和 1 或更多来编写工作代码。

《51.2. HBase MapReduce 读/写示例 下面是一个使用 HBase 作为源和作为 MapReduce 的接收器的示例。此示例将简单地将数据从一个表复制到另一个表。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,      // input table
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper class
  null,             // mapper output key
  null,             // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,      // output table
  null,             // reducer class
  job);
job.setNumReduceTasks(0);

boolean b = job.waitForCompletion(true);
if (!b) 
    throw new IOException("error with job!");

这是 1 个或多个示例:

"51.4. HBase MapReduce 汇总到 HBase 示例 以下示例使用 HBase 作为 MapReduce 源和接收器以及汇总步骤。此示例将计算一个表中某个值的不同实例的数量,并将这些汇总计数写入另一个表中。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,        // output table
  MyTableReducer.class,    // reducer class
  job);
job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) 
  throw new IOException("error with job!");

http://hbase.apache.org/book.html#mapreduce.example

您似乎更密切地关注第一个示例。我想表明,有时将 reduce 任务的数量设置为零是有原因的。

【讨论】:

以上是关于Mapreduce 作业到 HBase 抛出 IOException: Pass a Delete or a Put的主要内容,如果未能解决你的问题,请参考以下文章

如何将 HBase 的扫描限制在 MapReduce 作业的相关(未过滤)区域

在 hbase 1.0.1 上运行 mapreduce 的问题

原生 mapreduce VS hbase mapreduce

在使用 java 运行 Hadoop map reduce 作业时抛出空指针异常

将多个顺序 HBase 查询的结果传递给 Mapreduce 作业

在读取 hbase 表时挂起 Mapreduce 作业