HBase MapReduce

Posted 代号菜鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase MapReduce相关的知识,希望对你有一定的参考价值。

1. HBase to HBase

Mapper 继承 TableMapper,输入为Rowkey和Result.

public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
public TableMapper() {
}
}
package com.scb.jason.mapper;


import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2BasicMapper extends TableMapper<ImmutableBytesWritable, Put> {

    private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();

    @Override
    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //Get rowKey
        mapOutputkey.set(key.get());
        Put put = new Put(key.get());
        for(Cell cell:value.rawCells()){

            if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }
                if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }
            }

        }
        context.write(mapOutputkey,put);
    }

}

Reducer 继承 TableReducer

public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
public TableReducer() {
}
}
package com.scb.jason.reducer;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2BasicReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for(Put put:values){
            context.write(null,put);
        }
    }
}

Driver

package com.scb.jason.driver;

import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2BasicDriver extends Configured implements Tool{

    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don\'t set to true for MR jobs
        // set other scan attrs

        TableMapReduceUtil.initTableMapperJob(
                "user",      // input table
                scan,             // Scan instance to control CF and attribute selection
                User2BasicMapper.class,   // mapper class
                ImmutableBytesWritable.class,             // mapper output key
                Put.class,             // mapper output value
                job);
        TableMapReduceUtil.initTableReducerJob(
                "basic",      // output table
                User2BasicReducer.class,             // reducer class
                job);
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess?1:0;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration,new User2BasicDriver(),args);
        System.exit(status);
    }

}

 2. HBase to File

Mapper

package com.scb.jason.mapper;


import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2FileMapper extends TableMapper<Text, Text> {

    private Text rowKeyText = new Text();
    private Text valueText = new Text();

    @Override
    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //Get rowKey
        rowKeyText.set(key.get());
        Put put = new Put(key.get());
        byte[] inforName = null;
        byte[] inforAge = null;
        for(Cell cell:value.rawCells()){

            if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    inforName = CellUtil.cloneValue(cell);
                }
                if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    inforAge = CellUtil.cloneValue(cell);
                }
            }

        }
        valueText.set(new String(inforName)+"\\t"+new String(inforAge));
        context.write(rowKeyText,valueText);
    }

}

No Reducer Reducer

Driver

package com.scb.jason.driver;

import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.mapper.User2FileMapper;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2FileDriver extends Configured implements Tool{

    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don\'t set to true for MR jobs
        // set other scan attrs

        TableMapReduceUtil.initTableMapperJob(
                "user",      // input table
                scan,             // Scan instance to control CF and attribute selection
                User2FileMapper.class,   // mapper class
                Text.class,             // mapper output key
                Text.class,             // mapper output value
                job);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job,new Path(args[0]));
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess?1:0;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration,new User2FileDriver(),args);
        System.exit(status);
    }

}

3. File to HBase

Driver

package com.scb.jason.driver;

import com.scb.jason.mapper.File2HbaseMapper;
import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.reducer.File2HBaseReducer;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * Created by Administrator on 2017/8/16.
 */
public class File2BasicDriver extends Configured implements Tool{

    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        job.setMapperClass(File2HbaseMapper.class);
        FileInputFormat.addInputPath(job,new Path("F:\\\\Workspace\\\\File"));
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        TableMapReduceUtil.initTableReducerJob(
                "basic",      // output table
                File2HBaseReducer.class,             // reducer class
                job);
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess?1:0;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration,new File2BasicDriver(),args);
        System.exit(status);
    }

}

Mapper

package com.scb.jason.mapper;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by Administrator on 2017/8/17.
 */
public class File2HbaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {

    private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String lineValue = value.toString();
        StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
        String rowkey = stringTokenizer.nextToken();
        String name = stringTokenizer.nextToken();
        String age = stringTokenizer.nextToken();
        Put put = new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));
        put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));
        mapOutputkey.set(Bytes.toBytes(key.get()));
        context.write(mapOutputkey,put);
    }


}

Reducer

package com.scb.jason.reducer;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/25.
 */
public class File2HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for(Put put:values){
            context.write(null,put);
        }
    }

}

 4. HBase to RDBMS

public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

  private Connection c = null;

  public void setup(Context context) {
    // create DB connection...
  }

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    // do summarization
    // in this example the keys are Text, but this is just an example
  }

  public void cleanup(Context context) {
    // close db connection
  }

}

5. File -> HFile ->  HBase 批量导入

http://www.cnblogs.com/shitouer/archive/2013/02/20/hbase-hfile-bulk-load.html

 

以上是关于HBase MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce-读取HBase

HBase 链 MapReduce 作业,将较小的表广播到所有 Mapper

使用mapreduce复制hbase表

MapReduce和Spark写入Hbase多表总结

用Mapreduce 方式生成HFile,导入HBase

Hadoop之——HBASE结合MapReduce批量导入数据