Spark 使用bulk load导入数据到Hbase中

Posted 毛毛小妖的笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 使用bulk load导入数据到Hbase中相关的知识,希望对你有一定的参考价值。

最近在项目中有个需求:要求将spark sql统计的结果插入Hbase中。于是在网上搜索了一下相关的做法,找到两种解决办法,下面我来说说这三种方法,以及我采用的方式。由于网上的代码真的无法运行,踩了很多坑。所以我在这里把我的代码思路写下来,如果能帮助到广大的小伙伴,我甚是开心。

方案一.使用Hbase table的Put方法

这种方案比较简单,直接使用Hbase Table的api即可。但是效率比较低,有兴趣的可以自行百度,网上的例子说的很清楚。

方案二.使用Hbase 的bulk load方法

这种方案比较高效,对于大数据量的导入效率提升的不是一点点,也是我最终采用的解决办法。这种方案的核心就是:绕过了Hbase的预写日志WAL和分裂过程Split,通过spark api将dataframe组织成Hfile格式写入Hdfs,再剪切到Hbase中,这个过程特别快。


既然选中了第二种方案,那就开始干活吧。


  1. 通过spark sql统计数据,得到dataframe,这一步根据自己的业务来做
  2. 将dataframe转换成List<Tuple2<Tuple3<rowKey, column family, column>, value>>的结构,最外层的二元组是一个K-v结构,k是一个三元组,分别是rowKey、column family和column。因为hbase要求hfile的rowKey、column family和column必须有序,所以这里转换成这种结构,方便之后的排序。v就是具体的值了。
  3. 将第二步生成的RDD通过flatMapToPair算子打平为JavaPairRDD<Tuple3<rowKey, column family, column>, String>格式。
  4. 将第三步生成的RDD进行分区和排序。
  5. 将排序完成的rdd转换为hfile格式<ImmutableBytesWritable,keyValue>。
  6. 利用saveAsNewAPIHadoopFile算子将Hfile写入到hdfs中。
  7. 最终使用doBulkLoad方法将Hfile剪切到Hbase中。

是不是很简单呢?也是踩了不少坑才完成的。下面贴一下我的详细代码,大家可以参考一下,顺便也作为日后的笔记。

  1. 定义的静态变量
// base列族名称
private static String base_cf = "base";
// model列族名称
private static String model_cf = "model";
// hbase分区个数默认值
private static int numPartitions = 15;
// base列族的所有列
private List<String> base_list;
// model列族的所有列
private List<String> model_list;
  1. 核心方法
/**
 * @Description: 保存dataFrame为Hfile
 * @Author: shengyu
 * @Date: 2020/8/13/013
 **/
public void saveAsHfile(Dataset<Row> df) throws Exception {
        String hfile_path = "hdfs://sky-ns/test/hfile";
        String[] rowKey_fileds = {"user_id""merchant_num"};
        String hBase_table = "test_users";

        // zk相关
        String quorum = Context.ZOOKEEPER;
        String clientPort = "2181";

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", quorum);
        conf.set("hbase.zookeeper.property.clientPort", clientPort);
        conf.set(TableOutputFormat.OUTPUT_TABLE, hBase_table);

        Job job = Job.getInstance(conf);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);

        // 设置输出为snappy压缩
        sparkContext.getConf().set("spark.hadoop.mapred.output.compress","true");
        sparkContext.getConf().set("spark.hadoop.mapred.output.compression.codec","org.apache.hadoop.io.compress.snappy");

        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tableName = TableName.valueOf(hBase_table.getBytes());

        // 表不存在则创建hbase临时表
        creteHTable(hBase_table,conf);

        HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tableName), conn.getRegionLocator(tableName));

        // 删除已经存在的临时路径,否则会报错
        delete_hdfspath(hfile_path);

        // 转换结构为List<<<rowkey,cf,column>,value>>
        JavaRDD<List<Tuple2<Tuple3<String, String, String>, String>>> mapRdd = df.toJavaRDD().map((Function<Row, List<Tuple2<Tuple3<String, String, String>, String>>>) row -> {
            List<Tuple2<Tuple3<String, String, String>, String>> arrayList = new ArrayList<>();
            Map<String, String> columnsDataMap = getColumnsDataMap(row);
            String rowKey = SparkRowUtil.getRowkey(columnsDataMap, rowKey_fileds);

            Iterator<Map.Entry<String, String>> iterator = columnsDataMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, String> next = iterator.next();
                String col = next.getKey().toString();
                String value = next.getValue().toString();
                String  cf = getColumnFamilyByColumn(col.toLowerCase());
                Tuple2<Tuple3<String, String, String>, String> tuple22 = new Tuple2<>(new Tuple3<>(rowKey,cf,col), value);
                arrayList.add(tuple22);
            }
            return arrayList;
        });

        mapRdd.cache();
        mapRdd.isEmpty();

        // 拆分为<<rowkey,cf,column>,value>格式
        JavaPairRDD<Tuple3<String, String, String>, String> flatMapRdd =
                mapRdd.filter(item->!item.isEmpty())
                      .flatMapToPair((PairFlatMapFunction<List<Tuple2<Tuple3<String, String, String>, String>>, Tuple3<String, String, String>, String>) tuple2s -> tuple2s.iterator());

        // 对<rowkey,cf,column>进行二次排序
        JavaPairRDD<Tuple3<String, String, String>, String> sortedRdd = flatMapRdd.repartitionAndSortWithinPartitions(new MyPartitioner(numPartitions), MyComparator.INSTANCE);

        // 将排序完成的rdd转换为hfile格式<ImmutableBytesWritable,keyValue>
        JavaPairRDD<ImmutableBytesWritable, KeyValue> hfileRdd = sortedRdd.mapToPair((PairFunction<Tuple2<Tuple3<String, String, String>, String>, ImmutableBytesWritable, KeyValue>) tuple -> {
            byte[] row_key_byte = Bytes.toBytes(tuple._1()._1());
            byte[] cf_byte = Bytes.toBytes(tuple._1()._2());
            byte[] qua_byte = Bytes.toBytes(tuple._1()._3());
            byte[] value_byte = Bytes.toBytes(tuple._2());

            KeyValue keyValue = new KeyValue(row_key_byte, cf_byte, qua_byte, value_byte);
            return new Tuple2<>(new ImmutableBytesWritable(row_key_byte), keyValue);
        });

        // 保存为Hfile
        hfileRdd.saveAsNewAPIHadoopFile(hfile_path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());

        setPermission(hfile_path);

        // 加载hfile到hbase表中
        new LoadIncrementalHFiles(conf).doBulkLoad(new Path(hfile_path),conn.getAdmin(),conn.getTable(tableName),conn.getRegionLocator(tableName));
    }
    
/**
     * @Description: 根据列名获取列族
     * @Author: shengyu
     * @Date: 2020/8/15/015
     **/
    private String getColumnFamilyByColumn(String col){
        String cf = "";
        if (base_list.contains(col)) {
            cf = base_cf;
        } else if (model_list.contains(col)) {
            cf = model_cf;
        }
        return cf;
    }
    /**
     * @Description: 根据row封装成Map类型的数据
     * @Author: shengyu
     * @Date: 2020/8/15/015
     **/
    private Map<String, String> getColumnsDataMap(Row row) {
        List<SparkColumn> columnsList = new ArrayList<>();
        StructField[] fields = row.schema().fields();
        for(StructField f : fields) {
            SparkColumn column = new SparkColumn();
            column.setName(f.name());
            column.setDataType(f.dataType());
            columnsList.add(column);
        }

        // 遍历hbase列,获取列值,得到columnsDataMap
        Map<String, String> columnsDataMap = new HashMap<>();
        for(SparkColumn column : columnsList) {
            String strValue = getColumnValue(column, row);
            if(strValue != null) {
                columnsDataMap.put(column.getLowerCaseName(), strValue);
            }
        }
        return columnsDataMap;
    }

    /**
     * @Description: 获取列值,并转换为字符串
     * @Author: shengyu
     * @Date: 2020/8/13/013
     **/
    private String getColumnValue(SparkColumn column, Row r) {
        if(column.getDataType() instanceof ArrayType) {
            List<Object> listValue = SparkRowUtil.getListCell(r, column.getName());
            return listValue == null ? null : JSONObject.toJSONString(listValue);
        } else if (column.getDataType() instanceof MapType) {
            Map<String, Object> mapValue = SparkRowUtil.getMapCell(r, column.getName());
            return mapValue == null ? null : JSONObject.toJSONString(mapValue);
        } else {
            Object objValue = SparkRowUtil.getObjectCell(r, column.getName());
            return objValue == null ? null : (objValue + "");
        }
    }
    /**
     * @Description: 删除hdfs临时路径
     * @Author: shengyu
     * @Date: 2020/8/13/013
     **/
    private void delete_hdfspath(String save_path) throws IOException {
        FileSystem fs = FileSystem.get(new Configuration());
        Path path = new Path(save_path);
        if (fs.exists(path)) {
            fs.delete(path,true);
        }
    }

    /**
     * @Description: 给hdfs指定目录设置权限
     * @Author: shengyu
     * @Date: 2020/8/18/018
     **/
    private void setPermission(String filePath) throws IOException {
        FileSystem fs = FileSystem.get(new Configuration());
        Path path = new Path(filePath);
        FsPermission permission = new FsPermission(FsAction.ALL,FsAction.ALL, FsAction.ALL);
        if (fs.exists(path)) {
            fs.setPermission(path,permission);
            RemoteIterator<LocatedFileStatus> fileList = fs.listFiles(path, true);
            while (fileList.hasNext()) {
                LocatedFileStatus next = fileList.next();
                Path path1 = next.getPath();
                fs.setPermission(path1,permission);
            }
        }
    }

    /**
     * @Description: 创建hbase表
     * @Author: shengyu
     * @Date: 2020/8/13/013
     **/
    private void creteHTable(String tableName, Configuration hBaseConf) throws IOException {
        Connection connection = ConnectionFactory.createConnection(hBaseConf);
        TableName hBaseTableName = TableName.valueOf(tableName);
        Admin admin = connection.getAdmin();
        if (!admin.tableExists(hBaseTableName)) {
            HTableDescriptor tableDesc = new HTableDescriptor(hBaseTableName);
            HColumnDescriptor base_hcd = new HColumnDescriptor(Bytes.toBytes(base_cf));
            HColumnDescriptor model_hcd = new HColumnDescriptor(Bytes.toBytes(model_cf));

            tableDesc.addFamily(base_hcd.setCompressionType(Compression.Algorithm.SNAPPY));
            tableDesc.addFamily(model_hcd.setCompressionType(Compression.Algorithm.SNAPPY));
            byte[][] splits = new RegionSplitter.HexStringSplit().split(numPartitions);
            admin.createTable(tableDesc,splits);
        }
        numPartitions = admin.getTableRegions(hBaseTableName).size();
        connection.close();
    }
  1. 自定义比较器
package com.enbrands.analyze.spark.Hbase.partitioner;

import scala.Tuple3;

import java.io.Serializable;
import java.util.Comparator;
/**
 * @author shengyu
 * @className MyComparator
 * @Description 自定义比较器
 * @date 2020-08-13 09:36
 **/
public class MyComparator implements Serializable, Comparator<Tuple3<String,String, String>> {

    private static final long serialVersionUID = 12382943439484934L;

    public static final MyComparator INSTANCE = new MyComparator();

    private MyComparator(){}

    @Override
    public int compare(Tuple3<String, String, String> tup1, Tuple3<String, String, String> tup2) {
        // 1.根据rowkey排序
        int compare = tup1._1().compareTo(tup2._1());
        if (compare == 0) {
            // 2.根据cf排序
            compare = tup1._2().compareTo(tup2._2());
            if (compare == 0) {
                // 3.根据col排序
                compare = tup1._3().compareTo(tup2._3());
            }
        }
        return compare;
    }
}
  1. 自定义分区器
package com.enbrands.analyze.spark.Hbase.partitioner;

import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.spark.Partitioner;
import scala.Tuple3;

/**
 * @author shengyu
 * @className
 * @Description
 * @date 2020-08-15 09:29
 **/
public class MyPartitioner extends Partitioner {

    private int numPartitions =  15;

    public MyPartitioner(int numPartitions) {
        this.numPartitions = numPartitions;
    }

    @Override
    public int numPartitions() {
        return numPartitions;
    }

    @Override
    public int getPartition(Object key) {
        //计算region的split键值,总数为partitions-1个
        byte[][] splits = new RegionSplitter.HexStringSplit().split(numPartitions);

        Tuple3<String,String,String> tuple = (Tuple3<String,String,String>) key;

        if (tuple == null) {
            return 0;
        }

        //根据rowkey前缀,计算该条记录属于哪一个region范围内
        int i = 0;
        boolean foundIt = false;

        while (i < splits.length && !foundIt){
            String s = new String(splits[i]);
            if (tuple._1().substring(0,8).compareTo(s) < 0) {
                foundIt = true;
            }
            i++;
        }
        return i;
    }
}
  1. 实体类
package com.enbrands.analyze.spark.Hbase.beans;

import org.apache.spark.sql.types.DataType;

import java.io.Serializable;

/**
 * @author shengyu
 * @className SparkColumn
 * @Description SparkColumn
 * @date 2020-08-13 11:50
 **/
public class SparkColumn implements Serializable {

    private String name;

    private String lowerCaseName;

    private DataType dataType;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
        this.lowerCaseName = this.name.toLowerCase();
    }

    public String getLowerCaseName() {
        return lowerCaseName;
    }

    public DataType getDataType() {
        return dataType;
    }

    public void setDataType(DataType dataType) {
        this.dataType = dataType;
    }
}

好了今天的代码就到这里啦。峡谷走起~

以上是关于Spark 使用bulk load导入数据到Hbase中的主要内容,如果未能解决你的问题,请参考以下文章

HSQLDB Bulk Load 大量数据

Spark向Elasticsearch批量导入数据,出现重复的问题定位

Elasticsearch —— bulk批量导入数据

Bulk Insert:将文本数据(csv和txt)导入到数据库中

Hive 数据导入 HBase

如何在 SQL Standard 上导出 SSIS 数据并在 SQL Express 上使用 Bulk Insert 导入?