自定义 HBase-MapReduce

Posted 丶落幕

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义 HBase-MapReduce相关的知识,希望对你有一定的参考价值。

1 hdfs -> table

需求: 从hdfs读取数据,插入到hbase的表中
mapper

public class FruitMapper extends Mapper<LongWritable, Text,LongWritable, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}

reducer

public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            String[] split = value.toString().split("\\t");
            Put put=new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
            context.write(NullWritable.get(), put);
        }
    }
}

driver

public class FruitDriver implements Tool {
    private Configuration conf;
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(conf);
        job.setJarByClass(FruitDriver.class);

        job.setMapperClass(FruitMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        TableMapReduceUtil.initTableReducerJob(strings[1], FruitReducer.class, job);

        FileInputFormat.setInputPaths(job, new Path(strings[0]));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.conf=configuration;
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new FruitDriver(), args);
        System.exit(run);
    }
}

打包上传并执行

  1. mr1.jar: 打包好的jar包,改了个名字
  2. com.cssl.mr1.FruitDriver: main函数所在的全类名
  3. /input_fruit/fruit.tsv: hdfs文件所在位置
  4. fruit1: hbase上的表名
yarn jar mr1.jar com.cssl.mr1.FruitDriver /input_fruit/fruit.tsv fruit1

2 table -> table

需求: 将hbase表中的数据导入到hbase的另一张表中

mapper

public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        Put outV = new Put(key.get());
        for (Cell cell : value.rawCells()) {
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                outV.add(cell);
            }
        }
        context.write(key, outV);
    }
}

reducer

public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }

    }
}

driver
这样写是直接可以在本地运行的,需要把hbase的配置文件拷贝到项目的资源目录(有域名映射的话,Windows也需要配)
在这里插入图片描述

public class Fruit2Driver implements Tool {
    private Configuration configuration;
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(configuration);
        job.setJarByClass(Fruit2Driver.class);

        TableMapReduceUtil.initTableMapperJob("fruit1",
                new Scan(),
                Fruit2Mapper.class,
                ImmutableBytesWritable.class,
                Put.class,
                job);

        TableMapReduceUtil.initTableReducerJob("fruit2",
                Fruit2Reducer.class,
                job);

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.configuration=configuration;
    }

    @Override
    public Configuration getConf() {
        return this.configuration;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int run = ToolRunner.run(configuration, new Fruit2Driver(), args);
        System.exit(run);
    }
}

3 自定义协处理器

目标:当客户端往 "student" 中插入数据, 会同时向 "chen:student" 中插入数据

1)编写协处理器

/*
* 协处理器
* 1) 创建类: 继承BaseRegionObserver
* 2) 重写方法: postPut
* 3) 实现逻辑
*   增加student的数据,同时增加chen:student数据
* 4) 将项目打包后上传到hbase中,让hbase可以识别我们的协处理器
* */
public class InsertHbaseCoprocessor extends BaseRegionObserver {
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        //获取表
        Table table = e.getEnvironment().getTable(TableName.valueOf("chen:student"));
        //增加数据
        table.put(put);
        //关闭表
        table.close();
    }
}

2)打包上传到hbase的lib目录并重启hbase

3)删除原来的student表,创建新表并指定协处理器

//新增指定协处理器的表
@Test
public void addCoprocessorTable() throws IOException {
    Admin admin = connection.getAdmin();
    TableName tableName = TableName.valueOf("student");
    if (!admin.tableExists(tableName)) {
        //创建表描述对象
        HTableDescriptor hTableDescriptor=new HTableDescriptor(tableName);
        //指定协处理器
        hTableDescriptor.addCoprocessor("com.cssl.InsertHbaseCoprocessor");
        //增加列族
        hTableDescriptor.addFamily(new HColumnDescriptor("info"));
        admin.createTable(hTableDescriptor);
        System.out.println("创建成功~~~");
    }
}

4)测试插入并查看结果


以上是关于自定义 HBase-MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

自定义 HBase-MapReduce

自定义 HBase-MapReduce1

VSCode自定义代码片段——CSS选择器

VSCode自定义代码片段6——CSS选择器

VSCode自定义代码片段(vue主模板)

VSCode自定义代码片段——声明函数