自定义 HBase-MapReduce

Posted 丶落幕

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义 HBase-MapReduce相关的知识,希望对你有一定的参考价值。

自定义 HBase-MapReduce

1 hdfs -> table

需求: 从hdfs读取数据,插入到hbase的表中
mapper

public class FruitMapper extends Mapper<LongWritable, Text,LongWritable, Text> 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        context.write(key, value);
    

reducer

public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> 
    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        for (Text value : values) 
            String[] split = value.toString().split("\\t");
            Put put=new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));
            context.write(NullWritable.get(), put);
        
    

driver

public class FruitDriver implements Tool 
    private Configuration conf;
    @Override
    public int run(String[] strings) throws Exception 
        Job job = Job.getInstance(conf);
        job.setJarByClass(FruitDriver.class);

        job.setMapperClass(FruitMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        TableMapReduceUtil.initTableReducerJob(strings[1], FruitReducer.class, job);

        FileInputFormat.setInputPaths(job, new Path(strings[0]));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    

    @Override
    public void setConf(Configuration configuration) 
        this.conf=configuration;
    

    @Override
    public Configuration getConf() 
        return this.conf;
    

    public static void main(String[] args) throws Exception 
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new FruitDriver(), args);
        System.exit(run);
    

打包上传并执行

  1. mr1.jar: 打包好的jar包,改了个名字
  2. com.cssl.mr1.FruitDriver: main函数所在的全类名
  3. /input_fruit/fruit.tsv: hdfs文件所在位置
  4. fruit1: hbase上的表名
yarn jar mr1.jar com.cssl.mr1.FruitDriver /input_fruit/fruit.tsv fruit1

2 table -> table

需求: 将hbase表中的数据导入到hbase的另一张表中

mapper

public class Fruit2Mapper extends TableMapper<ImmutableBytesWritable, Put> 
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException 
        Put outV = new Put(key.get());
        for (Cell cell : value.rawCells()) 
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) 
                outV.add(cell);
            
        
        context.write(key, outV);
    

reducer

public class Fruit2Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> 
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException 
        for (Put put : values) 
            context.write(NullWritable.get(), put);
        

    

driver
这样写是直接可以在本地运行的,需要把hbase的配置文件拷贝到项目的资源目录(有域名映射的话,Windows也需要配)

public class Fruit2Driver implements Tool 
    private Configuration configuration;
    @Override
    public int run(String[] strings) throws Exception 
        Job job = Job.getInstance(configuration);
        job.setJarByClass(Fruit2Driver.class);

        TableMapReduceUtil.initTableMapperJob("fruit1",
                new Scan(),
                Fruit2Mapper.class,
                ImmutableBytesWritable.class,
                Put.class,
                job);

        TableMapReduceUtil.initTableReducerJob("fruit2",
                Fruit2Reducer.class,
                job);

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    

    @Override
    public void setConf(Configuration configuration) 
        this.configuration=configuration;
    

    @Override
    public Configuration getConf() 
        return this.configuration;
    

    public static void main(String[] args) throws Exception 
        Configuration configuration = HBaseConfiguration.create();
        int run = ToolRunner.run(configuration, new Fruit2Driver(), args);
        System.exit(run);
    

3 自定义协处理器

目标:当客户端往 "student" 中插入数据, 会同时向 "chen:student" 中插入数据

1)编写协处理器

/*
* 协处理器
* 1) 创建类: 继承BaseRegionObserver
* 2) 重写方法: postPut
* 3) 实现逻辑
*   增加student的数据,同时增加chen:student数据
* 4) 将项目打包后上传到hbase中,让hbase可以识别我们的协处理器
* */
public class InsertHbaseCoprocessor extends BaseRegionObserver 
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException 
        //获取表
        Table table = e.getEnvironment().getTable(TableName.valueOf("chen:student"));
        //增加数据
        table.put(put);
        //关闭表
        table.close();
    

2)打包上传到hbase的lib目录并重启hbase

3)删除原来的student表,创建新表并指定协处理器

//新增指定协处理器的表
@Test
public void addCoprocessorTable() throws IOException 
    Admin admin = connection.getAdmin();
    TableName tableName = TableName.valueOf("student");
    if (!admin.tableExists(tableName)) 
        //创建表描述对象
        HTableDescriptor hTableDescriptor=new HTableDescriptor(tableName);
        //指定协处理器
        hTableDescriptor.addCoprocessor("com.cssl.InsertHbaseCoprocessor");
        //增加列族
        hTableDescriptor.addFamily(new HColumnDescriptor("info"));
        admin.createTable(hTableDescriptor);
        System.out.println("创建成功~~~");
    

4)测试插入并查看结果


以上是关于自定义 HBase-MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

自定义 HBase-MapReduce

自定义 HBase-MapReduce1

自定义类加载器

自定义wsl安装位置以及多wsl共存

更改自定义 UITableVIewCell 的 UILabel 的位置

Grid布局(四)单元格自定义布局