Spark 使用bulk load导入数据到Hbase中
Posted 毛毛小妖的笔记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 使用bulk load导入数据到Hbase中相关的知识,希望对你有一定的参考价值。
最近在项目中有个需求:要求将spark sql统计的结果插入Hbase中。于是在网上搜索了一下相关的做法,找到两种解决办法,下面我来说说这三种方法,以及我采用的方式。由于网上的代码真的无法运行,踩了很多坑。所以我在这里把我的代码思路写下来,如果能帮助到广大的小伙伴,我甚是开心。
方案一.使用Hbase table的Put方法
这种方案比较简单,直接使用Hbase Table的api即可。但是效率比较低,有兴趣的可以自行百度,网上的例子说的很清楚。
方案二.使用Hbase 的bulk load方法
这种方案比较高效,对于大数据量的导入效率提升的不是一点点,也是我最终采用的解决办法。这种方案的核心就是:绕过了Hbase的预写日志WAL和分裂过程Split,通过spark api将dataframe组织成Hfile格式写入Hdfs,再剪切到Hbase中,这个过程特别快。
既然选中了第二种方案,那就开始干活吧。
-
通过spark sql统计数据,得到dataframe,这一步根据自己的业务来做 -
将dataframe转换成List<Tuple2<Tuple3<rowKey, column family, column>, value>>的结构,最外层的二元组是一个K-v结构,k是一个三元组,分别是rowKey、column family和column。因为hbase要求hfile的rowKey、column family和column必须有序,所以这里转换成这种结构,方便之后的排序。v就是具体的值了。 -
将第二步生成的RDD通过flatMapToPair算子打平为JavaPairRDD<Tuple3<rowKey, column family, column>, String>格式。 -
将第三步生成的RDD进行分区和排序。 -
将排序完成的rdd转换为hfile格式<ImmutableBytesWritable,keyValue>。 -
利用saveAsNewAPIHadoopFile算子将Hfile写入到hdfs中。 -
最终使用doBulkLoad方法将Hfile剪切到Hbase中。
是不是很简单呢?也是踩了不少坑才完成的。下面贴一下我的详细代码,大家可以参考一下,顺便也作为日后的笔记。
-
定义的静态变量
// base列族名称
private static String base_cf = "base";
// model列族名称
private static String model_cf = "model";
// hbase分区个数默认值
private static int numPartitions = 15;
// base列族的所有列
private List<String> base_list;
// model列族的所有列
private List<String> model_list;
-
核心方法
/**
* @Description: 保存dataFrame为Hfile
* @Author: shengyu
* @Date: 2020/8/13/013
**/
public void saveAsHfile(Dataset<Row> df) throws Exception {
String hfile_path = "hdfs://sky-ns/test/hfile";
String[] rowKey_fileds = {"user_id", "merchant_num"};
String hBase_table = "test_users";
// zk相关
String quorum = Context.ZOOKEEPER;
String clientPort = "2181";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", quorum);
conf.set("hbase.zookeeper.property.clientPort", clientPort);
conf.set(TableOutputFormat.OUTPUT_TABLE, hBase_table);
Job job = Job.getInstance(conf);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// 设置输出为snappy压缩
sparkContext.getConf().set("spark.hadoop.mapred.output.compress","true");
sparkContext.getConf().set("spark.hadoop.mapred.output.compression.codec","org.apache.hadoop.io.compress.snappy");
Connection conn = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf(hBase_table.getBytes());
// 表不存在则创建hbase临时表
creteHTable(hBase_table,conf);
HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tableName), conn.getRegionLocator(tableName));
// 删除已经存在的临时路径,否则会报错
delete_hdfspath(hfile_path);
// 转换结构为List<<<rowkey,cf,column>,value>>
JavaRDD<List<Tuple2<Tuple3<String, String, String>, String>>> mapRdd = df.toJavaRDD().map((Function<Row, List<Tuple2<Tuple3<String, String, String>, String>>>) row -> {
List<Tuple2<Tuple3<String, String, String>, String>> arrayList = new ArrayList<>();
Map<String, String> columnsDataMap = getColumnsDataMap(row);
String rowKey = SparkRowUtil.getRowkey(columnsDataMap, rowKey_fileds);
Iterator<Map.Entry<String, String>> iterator = columnsDataMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> next = iterator.next();
String col = next.getKey().toString();
String value = next.getValue().toString();
String cf = getColumnFamilyByColumn(col.toLowerCase());
Tuple2<Tuple3<String, String, String>, String> tuple22 = new Tuple2<>(new Tuple3<>(rowKey,cf,col), value);
arrayList.add(tuple22);
}
return arrayList;
});
mapRdd.cache();
mapRdd.isEmpty();
// 拆分为<<rowkey,cf,column>,value>格式
JavaPairRDD<Tuple3<String, String, String>, String> flatMapRdd =
mapRdd.filter(item->!item.isEmpty())
.flatMapToPair((PairFlatMapFunction<List<Tuple2<Tuple3<String, String, String>, String>>, Tuple3<String, String, String>, String>) tuple2s -> tuple2s.iterator());
// 对<rowkey,cf,column>进行二次排序
JavaPairRDD<Tuple3<String, String, String>, String> sortedRdd = flatMapRdd.repartitionAndSortWithinPartitions(new MyPartitioner(numPartitions), MyComparator.INSTANCE);
// 将排序完成的rdd转换为hfile格式<ImmutableBytesWritable,keyValue>
JavaPairRDD<ImmutableBytesWritable, KeyValue> hfileRdd = sortedRdd.mapToPair((PairFunction<Tuple2<Tuple3<String, String, String>, String>, ImmutableBytesWritable, KeyValue>) tuple -> {
byte[] row_key_byte = Bytes.toBytes(tuple._1()._1());
byte[] cf_byte = Bytes.toBytes(tuple._1()._2());
byte[] qua_byte = Bytes.toBytes(tuple._1()._3());
byte[] value_byte = Bytes.toBytes(tuple._2());
KeyValue keyValue = new KeyValue(row_key_byte, cf_byte, qua_byte, value_byte);
return new Tuple2<>(new ImmutableBytesWritable(row_key_byte), keyValue);
});
// 保存为Hfile
hfileRdd.saveAsNewAPIHadoopFile(hfile_path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());
setPermission(hfile_path);
// 加载hfile到hbase表中
new LoadIncrementalHFiles(conf).doBulkLoad(new Path(hfile_path),conn.getAdmin(),conn.getTable(tableName),conn.getRegionLocator(tableName));
}
/**
* @Description: 根据列名获取列族
* @Author: shengyu
* @Date: 2020/8/15/015
**/
private String getColumnFamilyByColumn(String col){
String cf = "";
if (base_list.contains(col)) {
cf = base_cf;
} else if (model_list.contains(col)) {
cf = model_cf;
}
return cf;
}
/**
* @Description: 根据row封装成Map类型的数据
* @Author: shengyu
* @Date: 2020/8/15/015
**/
private Map<String, String> getColumnsDataMap(Row row) {
List<SparkColumn> columnsList = new ArrayList<>();
StructField[] fields = row.schema().fields();
for(StructField f : fields) {
SparkColumn column = new SparkColumn();
column.setName(f.name());
column.setDataType(f.dataType());
columnsList.add(column);
}
// 遍历hbase列,获取列值,得到columnsDataMap
Map<String, String> columnsDataMap = new HashMap<>();
for(SparkColumn column : columnsList) {
String strValue = getColumnValue(column, row);
if(strValue != null) {
columnsDataMap.put(column.getLowerCaseName(), strValue);
}
}
return columnsDataMap;
}
/**
* @Description: 获取列值,并转换为字符串
* @Author: shengyu
* @Date: 2020/8/13/013
**/
private String getColumnValue(SparkColumn column, Row r) {
if(column.getDataType() instanceof ArrayType) {
List<Object> listValue = SparkRowUtil.getListCell(r, column.getName());
return listValue == null ? null : JSONObject.toJSONString(listValue);
} else if (column.getDataType() instanceof MapType) {
Map<String, Object> mapValue = SparkRowUtil.getMapCell(r, column.getName());
return mapValue == null ? null : JSONObject.toJSONString(mapValue);
} else {
Object objValue = SparkRowUtil.getObjectCell(r, column.getName());
return objValue == null ? null : (objValue + "");
}
}
/**
* @Description: 删除hdfs临时路径
* @Author: shengyu
* @Date: 2020/8/13/013
**/
private void delete_hdfspath(String save_path) throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
Path path = new Path(save_path);
if (fs.exists(path)) {
fs.delete(path,true);
}
}
/**
* @Description: 给hdfs指定目录设置权限
* @Author: shengyu
* @Date: 2020/8/18/018
**/
private void setPermission(String filePath) throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
Path path = new Path(filePath);
FsPermission permission = new FsPermission(FsAction.ALL,FsAction.ALL, FsAction.ALL);
if (fs.exists(path)) {
fs.setPermission(path,permission);
RemoteIterator<LocatedFileStatus> fileList = fs.listFiles(path, true);
while (fileList.hasNext()) {
LocatedFileStatus next = fileList.next();
Path path1 = next.getPath();
fs.setPermission(path1,permission);
}
}
}
/**
* @Description: 创建hbase表
* @Author: shengyu
* @Date: 2020/8/13/013
**/
private void creteHTable(String tableName, Configuration hBaseConf) throws IOException {
Connection connection = ConnectionFactory.createConnection(hBaseConf);
TableName hBaseTableName = TableName.valueOf(tableName);
Admin admin = connection.getAdmin();
if (!admin.tableExists(hBaseTableName)) {
HTableDescriptor tableDesc = new HTableDescriptor(hBaseTableName);
HColumnDescriptor base_hcd = new HColumnDescriptor(Bytes.toBytes(base_cf));
HColumnDescriptor model_hcd = new HColumnDescriptor(Bytes.toBytes(model_cf));
tableDesc.addFamily(base_hcd.setCompressionType(Compression.Algorithm.SNAPPY));
tableDesc.addFamily(model_hcd.setCompressionType(Compression.Algorithm.SNAPPY));
byte[][] splits = new RegionSplitter.HexStringSplit().split(numPartitions);
admin.createTable(tableDesc,splits);
}
numPartitions = admin.getTableRegions(hBaseTableName).size();
connection.close();
}
-
自定义比较器
package com.enbrands.analyze.spark.Hbase.partitioner;
import scala.Tuple3;
import java.io.Serializable;
import java.util.Comparator;
/**
* @author shengyu
* @className MyComparator
* @Description 自定义比较器
* @date 2020-08-13 09:36
**/
public class MyComparator implements Serializable, Comparator<Tuple3<String,String, String>> {
private static final long serialVersionUID = 12382943439484934L;
public static final MyComparator INSTANCE = new MyComparator();
private MyComparator(){}
@Override
public int compare(Tuple3<String, String, String> tup1, Tuple3<String, String, String> tup2) {
// 1.根据rowkey排序
int compare = tup1._1().compareTo(tup2._1());
if (compare == 0) {
// 2.根据cf排序
compare = tup1._2().compareTo(tup2._2());
if (compare == 0) {
// 3.根据col排序
compare = tup1._3().compareTo(tup2._3());
}
}
return compare;
}
}
-
自定义分区器
package com.enbrands.analyze.spark.Hbase.partitioner;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.spark.Partitioner;
import scala.Tuple3;
/**
* @author shengyu
* @className
* @Description
* @date 2020-08-15 09:29
**/
public class MyPartitioner extends Partitioner {
private int numPartitions = 15;
public MyPartitioner(int numPartitions) {
this.numPartitions = numPartitions;
}
@Override
public int numPartitions() {
return numPartitions;
}
@Override
public int getPartition(Object key) {
//计算region的split键值,总数为partitions-1个
byte[][] splits = new RegionSplitter.HexStringSplit().split(numPartitions);
Tuple3<String,String,String> tuple = (Tuple3<String,String,String>) key;
if (tuple == null) {
return 0;
}
//根据rowkey前缀,计算该条记录属于哪一个region范围内
int i = 0;
boolean foundIt = false;
while (i < splits.length && !foundIt){
String s = new String(splits[i]);
if (tuple._1().substring(0,8).compareTo(s) < 0) {
foundIt = true;
}
i++;
}
return i;
}
}
-
实体类
package com.enbrands.analyze.spark.Hbase.beans;
import org.apache.spark.sql.types.DataType;
import java.io.Serializable;
/**
* @author shengyu
* @className SparkColumn
* @Description SparkColumn
* @date 2020-08-13 11:50
**/
public class SparkColumn implements Serializable {
private String name;
private String lowerCaseName;
private DataType dataType;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
this.lowerCaseName = this.name.toLowerCase();
}
public String getLowerCaseName() {
return lowerCaseName;
}
public DataType getDataType() {
return dataType;
}
public void setDataType(DataType dataType) {
this.dataType = dataType;
}
}
好了今天的代码就到这里啦。峡谷走起~
以上是关于Spark 使用bulk load导入数据到Hbase中的主要内容,如果未能解决你的问题,请参考以下文章
Spark向Elasticsearch批量导入数据,出现重复的问题定位
Bulk Insert:将文本数据(csv和txt)导入到数据库中
如何在 SQL Standard 上导出 SSIS 数据并在 SQL Express 上使用 Bulk Insert 导入?