SparkStreaming Bulkload入Hyperbase--应用与原理
Posted 李_少
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming Bulkload入Hyperbase--应用与原理相关的知识,希望对你有一定的参考价值。
SparkStreaming Bulkload入Hyperbase–应用与原理
一、环境准备
见《Spark3.1.2 on TDH622》
二、补充jar包
三、关键代码说明
-
接入kafka数据
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
-
kafka消息拆包,解析为单条数据
JavaDStream<String> messages = stream.repartition(shuffleNum).map(new GetValueFunc());
public class GetValueFunc implements Function<ConsumerRecord<String, String>, String>, Serializable { @Override public String call(ConsumerRecord<String, String> consumerRecord) throws Exception { return consumerRecord.value(); } }
-
解析数据,生产hbase的KeyValue对象
JavaPairDStream<ImmutableBytesWritable, KeyValue> putStream = messages.flatMapToPair(new StringToKeyValueFunc());
public class StringToKeyValueFunc implements PairFlatMapFunction<String, ImmutableBytesWritable, KeyValue>, Serializable { Random random = null; @Override public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(String s) throws Exception { if (random == null) { random = new Random(); } String[] line = s.split(","); String rowkey = line[0]; String name = line[1]; String age = line[2]; List<Tuple2<ImmutableBytesWritable, KeyValue>> puts = new LinkedList<>(); KeyValue kv1 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(name)); String key = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "name" + "\\001" + random.nextInt(9997); puts.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(Bytes.toBytes(key)), kv1)); KeyValue kv2 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(age)); String key2 = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "age" + "\\001" + "\\001" + random.nextInt(9997); puts.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(Bytes.toBytes(key2)), kv2)); return puts.iterator(); } }
-
生成HFile,bulkload入库
putStream.foreachRDD(new BulkloadFunc(shuffleNum));
public class BulkloadFunc implements VoidFunction<JavaPairRDD<ImmutableBytesWritable, KeyValue>>, Serializable { private int sortNum; public BulkloadFunc(int sortNum) { this.sortNum = sortNum; } @Override public void call(JavaPairRDD<ImmutableBytesWritable, KeyValue> rdd) throws Exception { DateTime currentTime = new DateTime(); String day = currentTime.toString("yyyyMMdd"); String tableName = "default:testbulk2"; Configuration conf = new Configuration(false); conf.setClassLoader(BulkloadFunc.class.getClassLoader()); conf.addResource("hbase-site.xml"); conf.addResource("hdfs-site.xml"); conf.addResource("core-site.xml"); conf.setStrings("io.serializations", conf.get("io.serializations"), KeyValueSerialization.class.getName(), WritableSerialization.class.getName()); Job job = Job.getInstance(conf); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); HTable table = new HTable(conf, tableName); HFileOutputFormat.configureIncrementalLoad(job, table); String path = "hdfs://nameservice1/tmp/lsk2/" + currentTime.toString("yyyyMMddHHmmss"); job.getConfiguration().set("mapred.output.dir", path); rdd.sortByKey(new MyComparator(), true, sortNum).saveAsNewAPIHadoopDataset(job.getConfiguration()); load(conf,path,tableName); } private void load(Configuration conf,String path,String tableName) throws Exception { HTable table = new HTable(conf,tableName); LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf); bulkLoader.doBulkLoad(new Path(path),table); FileSystem fileSystem = FileSystem.get(conf); fileSystem.delete(new Path(path)); } }
四、原理解析
1.HBase存储原理
从HBase的架构图上可以看出,HBase中的存储包括HMaster、HRegionSever、HRegion、HLog、Store、MemStore、StoreFile、HFile等。HBase是Google的BigTable的开源实现,底层存储引擎是基于LSM-Tree数据结构设计的。写入数据时会先写WAL日志,再将数据写到写缓存MemStore中,等写缓存达到一定规模后或满足其他触发条件才会flush刷写到磁盘,这样就将磁盘随机写变成了顺序写,提高了写性能。每一次刷写磁盘都会生成新的HFile文件。
2.MemStore的主要作用
- 更新数据存储在 MemStore 中,使用 LSM(Log-Structured Merge Tree)数据结构存储,在内存内进行排序整合。即保证写入数据有序(HFile中数据都按照RowKey进行排序),同时可以极大地提升HBase的写入性能。
- 作为内存缓存,读取数据时会优先检查 MemStore,根据局部性原理,新写入的数据被访问的概率更大。
- 在持久化写入前可以做某些优化,例如:保留数据的版本设置为1,持久化只需写入最新版本。
3.HFile原理
-
HFile主要分为四个部分:Scanned block section,Non-scanned block section,Opening-time data section和Trailer。
- Scanned block section:表示顺序扫描HFile时(包含所有需要被读取的数据)所有的数据块将会被读取,包括Leaf Index Block和Bloom Block;
- Non-scanned block section:HFile顺序扫描的时候该部分数据不会被读取,主要包括Meta Block和Intermediate Level Data Index Blocks两部分;
- Load-on-open-section:这部分数据在HBase的region server启动时,需要加载到内存中。包括FileInfo、Bloom filter block、data block index和meta block index;
- Trailer:这部分主要记录了HFile的基本信息、各个部分的偏移值和寻址信息。
-
HFile生成过程
- 起初数据存在于MemStore中,Flush发生时,创建HFile Writer,MemStore中的KeyValues被一个个append到位于内存中的Data Block
- append时,会对Cell进行排序。(注:KeyValue是Cell的一种实现)
- 针对有序的Cell,HFile会生成三级索引:Root Index – Intermediate Level Data Index Block – Leaf Index Block
-
HFile中KeyValue的排序规则
org.apache.hadoop.hbase.CellComparator
类中的compare()
方法是KeyValue排序的核心方法public static int compare(final Cell a, final Cell b, boolean ignoreSequenceid) { // row int c = compareRows(a, b); if (c != 0) return c; c = compareWithoutRow(a, b); if(c != 0) return c; if (!ignoreSequenceid) { // Negate following comparisons so later edits show up first // mvccVersion: later sorts first return Longs.compare(b.getMvccVersion(), a.getMvccVersion()); } else { return c; } }
compareRows()
方法是比较rowkey的大小,保证rowkey按照字典顺序排列。若rowkey相同,则进入compareWithoutRow()
方法,compareWithoutRow()
方法的核心逻辑如下:boolean sameFamilySize = (leftCell.getFamilyLength() == rightCell.getFamilyLength()); if (!sameFamilySize) { // comparing column family is enough. return Bytes.compareTo(leftCell.getFamilyArray(), leftCell.getFamilyOffset(), leftCell.getFamilyLength(), rightCell.getFamilyArray(), rightCell.getFamilyOffset(), rightCell.getFamilyLength()); } int diff = compareColumns(leftCell, rightCell); if (diff != 0) return diff; diff = compareTimestamps(leftCell, rightCell); if (diff != 0) return diff; // Compare types. Let the delete types sort ahead of puts; i.e. types // of higher numbers sort before those of lesser numbers. Maximum (255) // appears ahead of everything, and minimum (0) appears after // everything. return (0xff & rightCell.getTypeByte()) - (0xff & leftCell.getTypeByte());
若列族的(字节)长度不一致,则按照字段顺序去比较列族就可以返回了。若长度一致,则会根据字典顺序比较列族,若列族一致,则继续比较列名、时间戳(按大小)、cell的type,直到结果不为0或者以上项都相同。
sequenceId
是关联WAL、HFile、MemStore三者内容的机制,此处略。
4.Spark生成HFile
核心思想:将kafka的数据组装成KeyValue对象,保证KeyValue有序,且必须和上节讲述的排序规则一致。
-
将kafka数据解析,组装成KeyValue对象。如:
String[] line = s.split(","); String rowkey = line[0]; String name = line[1]; String age = line[2]; KeyValue kv1 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(name)); KeyValue kv2 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(age));
-
根据排序规则,构造用来排序的key。
String key = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "name" + "\\001" + random.nextInt(9997); String key2 = Bytes.toString(Bytes.toBytes(rowkey)) + "\\001" + "cf" + "\\001" + "age" + "\\001" + random.nextInt(9997);
-
根据排序规则,自定义排序算法
String[] keyLeft = Bytes.toString(left.get()).split("\\001"); String[] keyRight = Bytes.toString(right.get()).split("\\001"); int compareResult = 0; for (int i = 0; i < size; i++) { compareResult = Bytes.compareTo(Bytes.toBytes(keyLeft[i]), Bytes.toBytes(keyRight[i])); if (compareResult != 0) { return compareResult; } } return compareResult;
-
将rdd排序并生产HFile至HDFS
rdd.sortByKey(new MyComparator(), true, sortNum).saveAsNewAPIHadoopDataset(job.getConfiguration());
5.HBase表Load HFile
private void load(Configuration conf,String path,String tableName) throws Exception {
HTable table = new HTable(conf,tableName);
LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
bulkLoader.doBulkLoad(new Path(path),table);
FileSystem fileSystem = FileSystem.get(conf);
fileSystem.delete(new Path(path));
}
以上是关于SparkStreaming Bulkload入Hyperbase--应用与原理的主要内容,如果未能解决你的问题,请参考以下文章