#IT明星不是梦#Hadoop整合Hbase案例详解
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了#IT明星不是梦#Hadoop整合Hbase案例详解相关的知识,希望对你有一定的参考价值。
需求:编写mapreduce程序实现将hbase中的一张表的数据复制到另一张表中*要求:读取HBase当中user这张表的f1:name、f1:age数据,将数据写入到另外一张user2表的f1列族里面去==****
第一步:创建表
注意:两张表的列族一定要相同
/**
create ‘user‘,‘f1‘
put ‘user‘,‘rk001‘,‘f1:name‘,‘tony‘
put ‘user‘,‘rk001‘,‘f1:age‘,‘12‘
put ‘user‘,‘rk001‘,‘f1:address‘,‘beijing‘
put ‘user‘,‘rk002‘,‘f1:name‘,‘wangwu‘
create ‘user2‘,‘f1‘
*/
第二步:创建maven工程并导入jar包
pom.xml文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Hadoop</groupId>
<artifactId>HbaseTang</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>
第三步:开发MR程序实现功能
(1)自定义map类
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
-
myuser f1: name&age => myuser2 f1
*/
public class HBaseReadMapper extends TableMapper<Text, Put> {
/**
*
-
@param key rowkey
-
@param value rowkey此行的数据 Result类型
-
@param context
-
@throws IOException
-
@throws InterruptedException
br/>`*/`
`@Override`protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
-
/**
- ImmutableBytesWritable key:Mapper接收数据值是Put对象,key是hbase中一条数据Put对应的rowkey(可序列化)
-
Result value:hbase中读取的result对象
- 获取rowkey的字节数组
*/
- 获取rowkey的字节数组
-
//获得roweky的字节数组
byte[] rowkey_bytes = key.get();
String rowkeyStr = Bytes.toString(rowkey_bytes);
Text text = new Text(rowkeyStr);
//输出数据 -> 写数据 -> Put 构建Put对象
Put put = new Put(rowkey_bytes);
//获取一行中所有的Cell对象
Cell[] cells = value.rawCells();
//将f1 : name& age输出
for(Cell cell: cells) {
//当前cell是否是f1
//列族
byte[] family_bytes = CellUtil.cloneFamily(cell);
String familyStr = Bytes.toString(family_bytes);
if("f1".equals(familyStr)) {
//在判断是否是name | age
byte[] qualifier_bytes = CellUtil.cloneQualifier(cell);
String qualifierStr = Bytes.toString(qualifier_bytes);
if("name".equals(qualifierStr)) {
put.add(cell);
}
if("age".equals(qualifierStr)) {
put.add(cell);
}
}
}
//判断是否为空;不为空,才输出
if(!put.isEmpty()){
context.write(text, put);
}
}
}
-
(2)自定义reduce类
package com.kaikeba.hbase.demo01;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* TableReducer第三个泛型包含rowkey信息
*/
public class HBaseWriteReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
//将map传输过来的数据,写入到hbase表
/**
Text:map端输出键类型
Put:map端输出值类型
ImmutableBytesWritable:reduce端输出键类型
*/
@Override
protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
/**
*Text key:接收map端输出键
*Iterable<Put> values:接收map端输出值,put对象封装成的迭代器
*/
//rowkey
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
immutableBytesWritable.set(key.toString().getBytes());
//遍历put对象,并输出
for(Put put: values) {
context.write(immutableBytesWritable, put);
}
}
}
(3)main入口类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseMR extends Configured implements Tool {
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
//设定绑定的zk集群
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
int run = ToolRunner.run(configuration, new HBaseMR(), args);
System.exit(run);
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf());
job.setJarByClass(HBaseMR.class);
//mapper
TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"), new Scan(),HBaseReadMapper.class, Text.class, Put.class, job);
//reducer
TableMapReduceUtil.initTableReducerJob("myuser2", HBaseWriteReducer.class, job);
boolean b = job.waitForCompletion(true);
return b? 0: 1;
}
}
第四步:打成jar包提交到集群运行
打包:
执行命令:
hadoop jar HbaseTang-1.0-SNAPSHOT.jar mapreduce_hbase.HbaseMR
执行结果:
以上是关于#IT明星不是梦#Hadoop整合Hbase案例详解的主要内容,如果未能解决你的问题,请参考以下文章