自定义 HBase-MapReduce
Posted 丶落幕
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义 HBase-MapReduce相关的知识,希望对你有一定的参考价值。
自定义 HBase-MapReduce
1 hdfs -> table
需求: 从hdfs读取数据,插入到hbase的表中
mapper
public class FruitMapper extends Mapper<LongWritable, Text,LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
reducer
public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
String[] split = value.toString().split("\\t");
Put put=new Put(Bytes.toBytes(split[0]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
context.write(NullWritable.get(), put);
}
}
}
driver
public class FruitDriver implements Tool {
private Configuration conf;
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(conf);
job.setJarByClass(FruitDriver.class);
job.setMapperClass(FruitMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.initTableReducerJob(strings[1], FruitReducer.class, job);
FileInputFormat.setInputPaths(job, new Path(strings[0]));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
@Override
public void setConf(Configuration configuration) {
this.conf=configuration;
}
@Override
public Configuration getConf() {
return this.conf;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new FruitDriver(), args);
System.exit(run);
}
}
打包上传并执行
- mr1.jar: 打包好的jar包,改了个名字
- com.cssl.mr1.FruitDriver: main函数所在的全类名
- /input_fruit/fruit.tsv: hdfs文件所在位置
- fruit1: hbase上的表名
yarn jar mr1.jar com.cssl.mr1.FruitDriver /input_fruit/fruit.tsv fruit1
2 table -> table
需求: 将hbase表中的数据导入到hbase的另一张表中
mapper
public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Put outV = new Put(key.get());
for (Cell cell : value.rawCells()) {
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
outV.add(cell);
}
}
context.write(key, outV);
}
}
reducer
public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
driver
这样写是直接可以在本地运行的,需要把hbase的配置文件拷贝到项目的资源目录
(有域名映射的话,Windows也需要配)
public class Fruit2Driver implements Tool {
private Configuration configuration;
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(configuration);
job.setJarByClass(Fruit2Driver.class);
TableMapReduceUtil.initTableMapperJob("fruit1",
new Scan(),
Fruit2Mapper.class,
ImmutableBytesWritable.class,
Put.class,
job);
TableMapReduceUtil.initTableReducerJob("fruit2",
Fruit2Reducer.class,
job);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
@Override
public void setConf(Configuration configuration) {
this.configuration=configuration;
}
@Override
public Configuration getConf() {
return this.configuration;
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
int run = ToolRunner.run(configuration, new Fruit2Driver(), args);
System.exit(run);
}
}
3 自定义协处理器
目标:当客户端往 "student" 中插入数据, 会同时向 "chen:student" 中插入数据
1)编写协处理器
/*
* 协处理器
* 1) 创建类: 继承BaseRegionObserver
* 2) 重写方法: postPut
* 3) 实现逻辑
* 增加student的数据,同时增加chen:student数据
* 4) 将项目打包后上传到hbase中,让hbase可以识别我们的协处理器
* */
public class InsertHbaseCoprocessor extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
//获取表
Table table = e.getEnvironment().getTable(TableName.valueOf("chen:student"));
//增加数据
table.put(put);
//关闭表
table.close();
}
}
2)打包上传到hbase的lib目录并重启hbase
3)删除原来的student表,创建新表并指定协处理器
//新增指定协处理器的表
@Test
public void addCoprocessorTable() throws IOException {
Admin admin = connection.getAdmin();
TableName tableName = TableName.valueOf("student");
if (!admin.tableExists(tableName)) {
//创建表描述对象
HTableDescriptor hTableDescriptor=new HTableDescriptor(tableName);
//指定协处理器
hTableDescriptor.addCoprocessor("com.cssl.InsertHbaseCoprocessor");
//增加列族
hTableDescriptor.addFamily(new HColumnDescriptor("info"));
admin.createTable(hTableDescriptor);
System.out.println("创建成功~~~");
}
}
4)测试插入并查看结果
以上是关于自定义 HBase-MapReduce的主要内容,如果未能解决你的问题,请参考以下文章