04 HBase与MapReduce整合

Posted 邱文斌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了04 HBase与MapReduce整合相关的知识,希望对你有一定的参考价值。

HBase与MapReduce整合时,有三种情形:

  • HBase作为MapReduce的数据流向;

  • HBase作为MapReduce的数据来源;

  • HBase同时作为MapReduce的数据来源和数据流向。

步骤

1. 准备

查看进程

6417 Jps
3780 ResourceManager
4086 NodeManager
3272 DataNode
6264 HRegionServer
3481 SecondaryNameNode
6044 HQuorumPeer
3133 NameNode
6111 HMaster

2. 导入数据

从HDFS导入数据到HBase时会将数据先暂存于hdfs://hadoop0:9000/user/root/tmp。如果此文件夹存在,则先删除它。

# 如果此文件夹存在,则先删除它。
hadoop fs -rm -r hdfs://hadoop0:9000/user/root/tmp
# 在HDFS上创建/input2/music2用于存放上传的音乐播放数据文件music1.txt
hadoop fs -mkdir hdfs://hadoop0:9000/input2/music2
# 将音乐播放数据music1.txt复制到linux的/root目录下,再上传到HDFS上的/input2/music2目录
hadoop fs -put /root/music11.txt hdfs://hadoop0:9000/input2/music2
# 调用HBase提供的importtsv工具在HBase上创建表music,并指定列族和列(music是表名)
hadoop jar /usr/local/hbase/lib/hbase-server-1.4.10.jar importtsv -Dimporttsv.bulk.output=tmp -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:singer,info:gender,info:ryghme,info:terminal music /input2/music2
# 此时进入hbase shell可以看到music表,但是并没有任何内容
# 调用HBase提供的completebulkload工具从暂存文件夹hdfs://hadoop0:9000/user/root/tmp加载数据到music表中
hadoop jar /usr/local/hbase/lib/hbase-server-1.4.10.jar completebulkload tmp music
# 此时进入hbase shell可以scan 'music',此时music表中已经有数据了
# 进入HBase Shell,创建namelist表,拥有一个列族details,用于存放统计结果
create 'namelist','details'
scan 'music'
# 准备就绪,可以调用MapReduce进行统计分析了

3. MapReduce处理

创建MyMapper类,继承自TableMapper,取出播放记录中的歌名,记为1次播放。

取出每行中的所有单元,实际上只扫描了一列(info:name,即音乐名称),因为在驱动中使用Scan来设置了过滤条件。

同时,将音乐名称作为key,播放次数(每次为1)作为value,传给Reducer来进一步统计分析。

package com.hbaseapi;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 取出播放记录中的歌名,记为一次播放
* @author root
*
*/
public class MyMapper extends TableMapper<Text,IntWritable>{

@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//取出每行中所有单元,实际上只扫描了一列(info:name),因为在驱动中使用Scan设置了过滤条件
List<Cell> cells = value.listCells();
//将音乐名称作为key,播放次数(每次为1)作为value
for (Cell cell : cells) {
Text text = new Text(Bytes.toString(CellUtil.cloneValue(cell)));
IntWritable iw = new IntWritable(1);
context.write(text, iw);
}
}
}

创建MyReducer,继承自TableReducer,将相同歌名的播放次数求和,与前面学习过的MapReduce入门示例WordCount的处理类似。统计完成后将结果另外输出到一个HBase表namelist中。

package com.hbaseapi;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 将相同歌名的播放次数求和,统计完成后,将结果另外输出到一个HBase表的namelist中
* @author root
*
*/
public class MyReducer extends TableReducer<Text,IntWritable,Text>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, Mutation>.Context context) throws IOException, InterruptedException {
int playCount = 0;
for (IntWritable intWritable : values) {
playCount += intWritable.get();
}
//为put操作指定行键
Put put = new Put(Bytes.toBytes(key.toString()));
//为put操作指定列和值
put.addColumn(Bytes.toBytes("details"), Bytes.toBytes("rank"), Bytes.toBytes(playCount));
context.write(key, put);
}
}

编写驱动程序,为music表设置过滤条件,只保留了歌名name一个字段,同时用TableMapReduceUtil工具设置music表为MapReduce的输入表,设置namelist表为MapReduce的输出表。

package com.hbaseapi;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;


/**
* 编写驱动程序,为music表设置过滤条件,只保留歌名name一个子段
* 同时用TableMapReduceUtil工具
* 设置music表为MapReduce的输入表
* 设置namelist表为MapReduce的输出表
* @author root
*
*/
public class Music {

/**
* 取出播放记录中的歌名,记为一次播放
* @author root
*
*/
static class MyMapper extends TableMapper<Text,IntWritable>{

protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//取出每行中所有单元,实际上只扫描了一列(info:name),因为在驱动中使用Scan设置了过滤条件
List<Cell> cells = value.listCells();
//将音乐名称作为key,播放次数(每次为1)作为value
for (Cell cell : cells) {
Text text = new Text(Bytes.toString(CellUtil.cloneValue(cell)));
IntWritable iw = new IntWritable(1);
context.write(text, iw);
}
}
}

/**
* 将相同歌名的播放次数求和,统计完成后,将结果另外输出到一个HBase表的namelist中
* @author root
*
*/
static class MyReducer extends TableReducer<Text,IntWritable,Text>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, Mutation>.Context context) throws IOException, InterruptedException {
int playCount = 0;
for (IntWritable intWritable : values) {
playCount += intWritable.get();
}
//为put操作指定行键
Put put = new Put(Bytes.toBytes(key.toString()));
//为put操作指定列和值
put.addColumn(Bytes.toBytes("details"), Bytes.toBytes("rank"), Bytes.toBytes(playCount));
context.write(key, put);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://hadoop0:9000/hbase");
conf.set("hbase.zookeeper.quorum", "hadoop0");
Job job = Job.getInstance(conf,"top-music");
//MapReduce程序作业基本配置
job.setJarByClass(Music.class);
job.setNumReduceTasks(1);
//为music表设置过滤条件,只保留歌名name
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
//使用HBase提供的工具类来设置job,这里导包注意import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
TableMapReduceUtil.initTableMapperJob("music", scan, MyMapper.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob("namelist", MyReducer.class, job);
job.waitForCompletion(true);
System.out.println("执行成功,统计结果存于namelist中");
}
}

4. 运行

导入/usr/local/hbase/lib目录下的全部jar包




以上是关于04 HBase与MapReduce整合的主要内容,如果未能解决你的问题,请参考以下文章

HBase整合MapReduce之建立HBase索引

HBase和Mapreduce

Spark DataFrame便捷整合HBase

hbase的典型场景

HBase与MapReduce集成

MapReduce 与 HBase 的关系?