SparkStreaming Bulkload入Hyperbase--应用与原理

Posted 李_少

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming Bulkload入Hyperbase--应用与原理相关的知识,希望对你有一定的参考价值。

SparkStreaming Bulkload入Hyperbase–应用与原理


一、环境准备

见《Spark3.1.2 on TDH622》

二、补充jar包

三、关键代码说明

  • 接入kafka数据

    JavaInputDStream<ConsumerRecord<String, String>> stream =
                    KafkaUtils.createDirectStream(
                            ssc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    
  • kafka消息拆包,解析为单条数据

    JavaDStream<String> messages = stream.repartition(shuffleNum).map(new GetValueFunc());
    
    public class GetValueFunc implements Function<ConsumerRecord<String, String>, String>, Serializable {
        @Override
        public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
            return consumerRecord.value();
        }
    }
    
  • 解析数据,生产hbase的KeyValue对象

    JavaPairDStream<ImmutableBytesWritable, KeyValue> putStream = messages.flatMapToPair(new StringToKeyValueFunc());
    
    public class StringToKeyValueFunc implements PairFlatMapFunction<String, ImmutableBytesWritable, KeyValue>, Serializable {
    
        Random random = null;
    
        @Override
        public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(String s) throws Exception {
            if (random == null) {
                random = new Random();
            }
            String[] line = s.split(",");
            String rowkey = line[0];
            String name = line[1];
            String age = line[2];
            
            List<Tuple2<ImmutableBytesWritable, KeyValue>> puts = new LinkedList<>();
    
            KeyValue kv1 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(name));
            String key = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "name" + "\\001" + random.nextInt(9997);
            puts.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(Bytes.toBytes(key)), kv1));
            KeyValue kv2 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(age));
            String key2 = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "age" + "\\001" + "\\001" + random.nextInt(9997);
            puts.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(Bytes.toBytes(key2)), kv2));
    
            return puts.iterator();
        }
    }
    
  • 生成HFile,bulkload入库

    putStream.foreachRDD(new BulkloadFunc(shuffleNum));
    
    public class BulkloadFunc implements VoidFunction<JavaPairRDD<ImmutableBytesWritable, KeyValue>>, Serializable {
    
        private int sortNum;
    
        public BulkloadFunc(int sortNum) {
            this.sortNum = sortNum;
        }
    
        @Override
        public void call(JavaPairRDD<ImmutableBytesWritable, KeyValue> rdd) throws Exception {
            DateTime currentTime = new DateTime();
            String day = currentTime.toString("yyyyMMdd");
            String tableName = "default:testbulk2";
            Configuration conf = new Configuration(false);
            conf.setClassLoader(BulkloadFunc.class.getClassLoader());
            conf.addResource("hbase-site.xml");
            conf.addResource("hdfs-site.xml");
            conf.addResource("core-site.xml");
    
            conf.setStrings("io.serializations", conf.get("io.serializations"),
                    KeyValueSerialization.class.getName(), WritableSerialization.class.getName());
    
            Job job = Job.getInstance(conf);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(KeyValue.class);
    
            HTable table = new HTable(conf, tableName);
            HFileOutputFormat.configureIncrementalLoad(job, table);
            String path = "hdfs://nameservice1/tmp/lsk2/" + currentTime.toString("yyyyMMddHHmmss");
            job.getConfiguration().set("mapred.output.dir", path);
            rdd.sortByKey(new MyComparator(), true, sortNum).saveAsNewAPIHadoopDataset(job.getConfiguration());
            load(conf,path,tableName);
        }
    
        private void load(Configuration conf,String path,String tableName) throws Exception {
            HTable table = new HTable(conf,tableName);
            LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
            bulkLoader.doBulkLoad(new Path(path),table);
            FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.delete(new Path(path));
        }
    }
    

四、原理解析

1.HBase存储原理

从HBase的架构图上可以看出,HBase中的存储包括HMaster、HRegionSever、HRegion、HLog、Store、MemStore、StoreFile、HFile等。HBase是Google的BigTable的开源实现,底层存储引擎是基于LSM-Tree数据结构设计的。写入数据时会先写WAL日志,再将数据写到写缓存MemStore中,等写缓存达到一定规模后或满足其他触发条件才会flush刷写到磁盘,这样就将磁盘随机写变成了顺序写,提高了写性能。每一次刷写磁盘都会生成新的HFile文件。

2.MemStore的主要作用

  1. 更新数据存储在 MemStore 中,使用 LSM(Log-Structured Merge Tree)数据结构存储,在内存内进行排序整合。即保证写入数据有序(HFile中数据都按照RowKey进行排序),同时可以极大地提升HBase的写入性能。
  2. 作为内存缓存,读取数据时会优先检查 MemStore,根据局部性原理,新写入的数据被访问的概率更大。
  3. 在持久化写入前可以做某些优化,例如:保留数据的版本设置为1,持久化只需写入最新版本。

3.HFile原理

  • HFile主要分为四个部分:Scanned block section,Non-scanned block section,Opening-time data section和Trailer。

    • Scanned block section:表示顺序扫描HFile时(包含所有需要被读取的数据)所有的数据块将会被读取,包括Leaf Index Block和Bloom Block;
    • Non-scanned block section:HFile顺序扫描的时候该部分数据不会被读取,主要包括Meta Block和Intermediate Level Data Index Blocks两部分;
    • Load-on-open-section:这部分数据在HBase的region server启动时,需要加载到内存中。包括FileInfo、Bloom filter block、data block index和meta block index;
    • Trailer:这部分主要记录了HFile的基本信息、各个部分的偏移值和寻址信息。

  • HFile生成过程

    • 起初数据存在于MemStore中,Flush发生时,创建HFile Writer,MemStore中的KeyValues被一个个append到位于内存中的Data Block
    • append时,会对Cell进行排序。(注:KeyValue是Cell的一种实现)
    • 针对有序的Cell,HFile会生成三级索引:Root Index – Intermediate Level Data Index Block – Leaf Index Block
  • HFile中KeyValue的排序规则

    org.apache.hadoop.hbase.CellComparator类中的compare()方法是KeyValue排序的核心方法

    public static int compare(final Cell a, final Cell b, boolean ignoreSequenceid) {
        // row
        int c = compareRows(a, b);
        if (c != 0) return c;
    
        c = compareWithoutRow(a, b);
        if(c != 0) return c;
    
        if (!ignoreSequenceid) {
          // Negate following comparisons so later edits show up first
          // mvccVersion: later sorts first
          return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
        } else {
          return c;
        }
      }
    

    compareRows()方法是比较rowkey的大小,保证rowkey按照字典顺序排列。若rowkey相同,则进入compareWithoutRow()方法,compareWithoutRow()方法的核心逻辑如下:

        boolean sameFamilySize = (leftCell.getFamilyLength() == rightCell.getFamilyLength());
        if (!sameFamilySize) {
          // comparing column family is enough.
    
          return Bytes.compareTo(leftCell.getFamilyArray(), leftCell.getFamilyOffset(),
              leftCell.getFamilyLength(), rightCell.getFamilyArray(), rightCell.getFamilyOffset(),
              rightCell.getFamilyLength());
        }
        int diff = compareColumns(leftCell, rightCell);
        if (diff != 0) return diff;
    
        diff = compareTimestamps(leftCell, rightCell);
        if (diff != 0) return diff;
    
        // Compare types. Let the delete types sort ahead of puts; i.e. types
        // of higher numbers sort before those of lesser numbers. Maximum (255)
        // appears ahead of everything, and minimum (0) appears after
        // everything.
        return (0xff & rightCell.getTypeByte()) - (0xff & leftCell.getTypeByte());
    

    若列族的(字节)长度不一致,则按照字段顺序去比较列族就可以返回了。若长度一致,则会根据字典顺序比较列族,若列族一致,则继续比较列名、时间戳(按大小)、cell的type,直到结果不为0或者以上项都相同。

    sequenceId是关联WAL、HFile、MemStore三者内容的机制,此处略。

4.Spark生成HFile

核心思想:将kafka的数据组装成KeyValue对象,保证KeyValue有序,且必须和上节讲述的排序规则一致。

  • 将kafka数据解析,组装成KeyValue对象。如:

    String[] line = s.split(",");
    String rowkey = line[0];
    String name = line[1];
    String age = line[2];
    KeyValue kv1 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(name));
    KeyValue kv2 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(age));
    
  • 根据排序规则,构造用来排序的key。

    String key = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "name" + "\\001" + random.nextInt(9997);
    String key2 = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "age" + "\\001" + random.nextInt(9997);
    
  • 根据排序规则,自定义排序算法

    String[] keyLeft = Bytes.toString(left.get()).split("\\001");
    String[] keyRight = Bytes.toString(right.get()).split("\\001");
    int compareResult = 0;
    for (int i = 0; i < size; i++) {
        compareResult = Bytes.compareTo(Bytes.toBytes(keyLeft[i]), Bytes.toBytes(keyRight[i]));
        if (compareResult != 0) {
            return compareResult;
        }
    }
    return compareResult;
    
  • 将rdd排序并生产HFile至HDFS

    rdd.sortByKey(new MyComparator(), true, sortNum).saveAsNewAPIHadoopDataset(job.getConfiguration());
    

5.HBase表Load HFile

private void load(Configuration conf,String path,String tableName) throws Exception {
        HTable table = new HTable(conf,tableName);
        LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
        bulkLoader.doBulkLoad(new Path(path),table);
        FileSystem fileSystem = FileSystem.get(conf);
        fileSystem.delete(new Path(path));
    }

以上是关于SparkStreaming Bulkload入Hyperbase--应用与原理的主要内容,如果未能解决你的问题,请参考以下文章

HBase学习之BulkLoad

HBase的bulkLoad

BulkLoad加载本地文件到HBase表

[How to] HBase的bulkload使用方法

Spark-2.3.2 HBase BulkLoad

使用bulkload方式加载数据到HBase(三种方式)