Mapreduce 作业到 HBase 抛出 IOException: Pass a Delete or a Put
Posted
技术标签:
【中文标题】Mapreduce 作业到 HBase 抛出 IOException: Pass a Delete or a Put【英文标题】:Mapreduce job to HBase throws IOException: Pass a Delete or a Put 【发布时间】:2015-02-16 00:07:38 【问题描述】:我正在尝试直接从我的 Mapper 输出到 HBase 表,同时在 EMR 上使用带有 HBase0.94.18 的 Hadoop2.4.0。
在执行下面的代码时,我得到了一个讨厌的IOException: Pass a Delete or a Put
。
public class TestHBase
static class ImportMapper
extends Mapper<MyKey, MyValue, ImmutableBytesWritable, Writable>
private byte[] family = Bytes.toBytes("f");
@Override
public void map(MyKey key, MyValue value, Context context)
MyItem item = //do some stuff with key/value and create item
byte[] rowKey = Bytes.toBytes(item.getKey());
Put put = new Put(rowKey);
for (String attr : Arrays.asList("a1", "a2", "a3"))
byte[] qualifier = Bytes.toBytes(attr);
put.add(family, qualifier, Bytes.toBytes(item.get(attr)));
context.write(new ImmutableBytesWritable(rowKey), put);
public static void main(String[] args) throws Exception
Configuration conf = HBaseConfiguration.create();
String input = args[0];
String table = "table";
Job job = Job.getInstance(conf, "stuff");
job.setJarByClass(ImportMapper.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.addInputPath(job, new Path(input));
TableMapReduceUtil.initTableReducerJob(
table, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
有谁知道我做错了什么?
堆栈跟踪
错误:java.io.IOException:在 org.apache.hadoop.hbase.mapreduce 的 org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) 处传递 Delete 或 Put .TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:646) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl .java:89) 在 org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) 在 org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) 在org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) 在 org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775) 在 org.apache.hadoop.mapred.MapTask.run( MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject .java:415) 在 org.apache.hadoop.security .UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) 容器被 ApplicationMaster 杀死。容器应要求被杀死。退出代码为 143 容器以非零退出代码 143 退出
【问题讨论】:
让我们看看堆栈跟踪 在您提供的代码中:context.write(new ImmutableBytesWritable(rowKey), put);
它在 map 方法之外。请先修复它,因为它与回溯显示的内容不匹配...
感谢您指出 Ruben,这是复制/粘贴错误
【参考方案1】:
如果你能显示完整的堆栈跟踪会更好,这样我就可以帮助你轻松解决它。我没有执行你的代码。据我看到你的代码,这可能是问题job.setNumReduceTasks(0);
Mapper 将期待您的 put
对象直接写入 Apache HBase。
你可以增加setNumReduceTasks
或者如果你看到API你可以找到它的默认值并评论它。
【讨论】:
添加了堆栈跟踪【参考方案2】:感谢您添加堆栈跟踪。不幸的是,您没有包含引发异常的代码,因此我无法为您完全跟踪它。相反,我做了一些搜索,为你发现了一些东西。
您的堆栈跟踪类似于此处另一个 SO 问题中的堆栈跟踪: Pass a Delete or a Put error in hbase mapreduce
那个人通过注释掉job.setNumReduceTasks(0);
解决了这个问题
有一个类似的 SO question 有相同的异常,但无法以这种方式解决问题。相反,它遇到了注释问题:
"java.io.IOException: Pass a Delete or a Put" when reading HDFS and storing HBase
Here 是一些很好的例子,说明了如何使用 setNumReduceTasks 为 0 和 1 或更多来编写工作代码。
《51.2. HBase MapReduce 读/写示例 下面是一个使用 HBase 作为源和作为 MapReduce 的接收器的示例。此示例将简单地将数据从一个表复制到另一个表。
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b)
throw new IOException("error with job!");
这是 1 个或多个示例:
"51.4. HBase MapReduce 汇总到 HBase 示例 以下示例使用 HBase 作为 MapReduce 源和接收器以及汇总步骤。此示例将计算一个表中某个值的不同实例的数量,并将这些汇总计数写入另一个表中。
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b)
throw new IOException("error with job!");
http://hbase.apache.org/book.html#mapreduce.example
您似乎更密切地关注第一个示例。我想表明,有时将 reduce 任务的数量设置为零是有原因的。
【讨论】:
以上是关于Mapreduce 作业到 HBase 抛出 IOException: Pass a Delete or a Put的主要内容,如果未能解决你的问题,请参考以下文章
如何将 HBase 的扫描限制在 MapReduce 作业的相关(未过滤)区域
在 hbase 1.0.1 上运行 mapreduce 的问题
原生 mapreduce VS hbase mapreduce
在使用 java 运行 Hadoop map reduce 作业时抛出空指针异常