HBase与MapReduce集成
Posted chenshaowei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase与MapReduce集成相关的知识,希望对你有一定的参考价值。
感觉效率不是很高,是否能用sqoop来解决HBase与其他文件系统的数据导入导出。
通过HBase的相关JavaApi,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件导入HBase的表中,或我们从HBase的表中读取一些原始数据用于MapReduce做数据分析。
案例
将fruit文件的一部分数据,通过MapReduce导入fruit_mr表中
//fruit.tsv文件
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
构建ReadFruitMapper类,用于读取fruit表中的数据
package com.atguigu;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends
TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
//将 fruit 的 name 和 color 提取出来,相当于将每一行数据读取出来放入到 Put对象中。
Put put = new Put(key.get());
//遍历添加 column 行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列: name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//将该列 cell 加入到 put 对象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向该列 cell 加入到 put 对象中
put.add(cell);
}
}
}
//将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出
context.write(key, put);
}
}
构建WriteFruitMRReducer类,用于将读取到的数据写入fruit_mr表中
package com.atguigu.Hbase_mr;
import java.io.IOException;
import org.apache.hadoop.Hbase.client.Put;
import org.apache.hadoop.Hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.Hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends
TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put>
values, Context context)
throws IOException, InterruptedException {
//读出来的每一行数据写入到 fruit_mr 表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
构建Fruit2FruitMRRunner extends Congfigured implements Tool用于组装运行job
//组装 Job
public int run(String[] args) throws Exception {
//得到 Configuration
Configuration conf = this.getConf();
//创建 Job 任务
Job job = Job.getInstance(conf,
this.getClass().getSimpleName());
job.setJarByClass(Fruit2FruitMRRunner.class);
//配置 Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者
是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit", //数据源的表名
scan, //scan 扫描控制器
ReadFruitMapper.class,//设置 Mapper 类
ImmutableBytesWritable.class,//设置 Mapper 输出 key 类型
Put.class,//设置 Mapper 输出 value 值类型
job//设置给哪个 JOB
);
//设置 Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr",
WriteFruitMRReducer.class, job);
//设置 Reduce 数量,最少 1 个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
主函数中调用运行该job任务
public static void main( String[] args ) throws Exception{
Configuration conf = HbaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
System.exit(status);
}
打包运行任务
$ /opt/module/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/Hbase-
0.0.1-SNAPSHOT.jar
com.z.Hbase.mr1.Fruit2FruitMRRunner
以上是关于HBase与MapReduce集成的主要内容,如果未能解决你的问题,请参考以下文章