HBase MapReduce
Posted 代号菜鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase MapReduce相关的知识,希望对你有一定的参考价值。
1. HBase to HBase
Mapper 继承 TableMapper,输入为Rowkey和Result.
public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
public TableMapper() {
}
}
package com.scb.jason.mapper; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by Administrator on 2017/8/16. */ public class User2BasicMapper extends TableMapper<ImmutableBytesWritable, Put> { private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable(); @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //Get rowKey mapOutputkey.set(key.get()); Put put = new Put(key.get()); for(Cell cell:value.rawCells()){ if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } } } context.write(mapOutputkey,put); } }
Reducer 继承 TableReducer
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
public TableReducer() {
}
}
package com.scb.jason.reducer; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by Administrator on 2017/8/16. */ public class User2BasicReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{ @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for(Put put:values){ context.write(null,put); } } }
Driver
package com.scb.jason.driver; import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * Created by Administrator on 2017/8/16. */ public class User2BasicDriver extends Configured implements Tool{ public int run(String[] strings) throws Exception { Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don\'t set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob( "user", // input table scan, // Scan instance to control CF and attribute selection User2BasicMapper.class, // mapper class ImmutableBytesWritable.class, // mapper output key Put.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( "basic", // output table User2BasicReducer.class, // reducer class job); job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); return isSuccess?1:0; } public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); int status = ToolRunner.run(configuration,new User2BasicDriver(),args); System.exit(status); } }
2. HBase to File
Mapper
package com.scb.jason.mapper; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import java.io.IOException; /** * Created by Administrator on 2017/8/16. */ public class User2FileMapper extends TableMapper<Text, Text> { private Text rowKeyText = new Text(); private Text valueText = new Text(); @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //Get rowKey rowKeyText.set(key.get()); Put put = new Put(key.get()); byte[] inforName = null; byte[] inforAge = null; for(Cell cell:value.rawCells()){ if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ inforName = CellUtil.cloneValue(cell); } if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ inforAge = CellUtil.cloneValue(cell); } } } valueText.set(new String(inforName)+"\\t"+new String(inforAge)); context.write(rowKeyText,valueText); } }
No Reducer Reducer
Driver
package com.scb.jason.driver; import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.mapper.User2FileMapper; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Created by Administrator on 2017/8/16. */ public class User2FileDriver extends Configured implements Tool{ public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don\'t set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob( "user", // input table scan, // Scan instance to control CF and attribute selection User2FileMapper.class, // mapper class Text.class, // mapper output key Text.class, // mapper output value job); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job,new Path(args[0])); job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); return isSuccess?1:0; } public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); int status = ToolRunner.run(configuration,new User2FileDriver(),args); System.exit(status); } }
3. File to HBase
Driver
package com.scb.jason.driver; import com.scb.jason.mapper.File2HbaseMapper; import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.reducer.File2HBaseReducer; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Created by Administrator on 2017/8/16. */ public class File2BasicDriver extends Configured implements Tool{ public int run(String[] strings) throws Exception { Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(File2HbaseMapper.class); FileInputFormat.addInputPath(job,new Path("F:\\\\Workspace\\\\File")); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); TableMapReduceUtil.initTableReducerJob( "basic", // output table File2HBaseReducer.class, // reducer class job); job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); return isSuccess?1:0; } public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); int status = ToolRunner.run(configuration,new File2BasicDriver(),args); System.exit(status); } }
Mapper
package com.scb.jason.mapper; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; /** * Created by Administrator on 2017/8/17. */ public class File2HbaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> { private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); StringTokenizer stringTokenizer = new StringTokenizer(lineValue); String rowkey = stringTokenizer.nextToken(); String name = stringTokenizer.nextToken(); String age = stringTokenizer.nextToken(); Put put = new Put(Bytes.toBytes(rowkey)); put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name)); put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age)); mapOutputkey.set(Bytes.toBytes(key.get())); context.write(mapOutputkey,put); } }
Reducer
package com.scb.jason.reducer; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import java.io.IOException; /** * Created by Administrator on 2017/8/25. */ public class File2HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for(Put put:values){ context.write(null,put); } } }
4. HBase to RDBMS
public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private Connection c = null; public void setup(Context context) { // create DB connection... } public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // do summarization // in this example the keys are Text, but this is just an example } public void cleanup(Context context) { // close db connection } }
5. File -> HFile -> HBase 批量导入
http://www.cnblogs.com/shitouer/archive/2013/02/20/hbase-hfile-bulk-load.html
以上是关于HBase MapReduce的主要内容,如果未能解决你的问题,请参考以下文章