Spark DataFrame便捷整合HBase

Posted 绿盟科技研究通讯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark DataFrame便捷整合HBase相关的知识,希望对你有一定的参考价值。


一、HBase简介


HBase的介绍在网上随处可见,借用《HBase企业应用实战》中的描述,如下:


HBase(Hadoop Database)是一个高可靠、高性能、面向列、可伸缩的分布式数据库,利用HBase技术可在廉价PC上搭建起大规模结构化存储集群。HBase参考Google的BigTable建模,使用类似GFS的HDFS作为底层文件存储系统,在其上可以运行MapReduce批量处理数据,使用ZooKeeper作为协同服务组件。

HBase的整个项目使用Java语言实现,它是Apache基金会的Hadoop项目的一部分,既是模仿Google BigTable的开源产品,同时又是Hadoop的衍生产品。而Hadoop作为批量离线计算系统已经得到了业界的普遍认可,并经过了工业上的验证,所以HBase具备“站在巨人肩膀之上”的优势,其发展势头非常迅猛。

HBase还是一种非关系型数据库,即NoSQL数据库。在Eric Brewer的CAP理论中,HBase属于CP类型的系统,其NoSQL的特性非常明显,这些特性也决定了其独特的应用场景。

CAP(http://s3.thinkaurelius.com/docs/titan/0.5.4/benefits.html


二、HBase的选型依据


1使用场景

《HBase企业应用实战》中提到HBase较为适合如下几种使用需求,笔者针对不同的需求加入相应的个人理解,描述如下:

  • 存储大量的数据(PB级数据)且能保证良好的随机访问性能:存储量级由HDFS的良好扩展性和容错性支撑,随机访问性能良好是由于其Rowkey根据字符串大小排列,同时有MemStore进行内存缓存加持。

  • 需要很高的写吞吐量,瞬间写入量很大,传统数据库不能支撑或需要很高成本支撑的场景:瞬间写入并发性能高得益于WAL机制。(详见HBase-数据写入流程解析http://hbasefly.com/2016/03/23/hbase_writer/

  •  可以进行优雅的数据扩展,动态扩展整个存储系统容量:同样由HDFS支撑。

  • 数据格式无限制,支持半结构化和非结构化的数据:HBase中存储值均为Bytes类型,读写两端协商好如何转换即可支持任意类型的数据。

  • 业务场景简单,不需要全部的关系型数据库特性,例如交叉列、交叉表,事务、连接等。 

具体使用场景如用户画像、实时指标计算、更新计算和增量计算,HBase都比较适合。

那么,哪些场景是不适合HBase的,主要有如下两个场景:

  • 大量读大量写:HBase的写性能由WAL机制保证,其Rowkey、MemStore和StoreFile两级缓存+存储保证了随机读性能。然而,批量读操作在HBase中对应Scan动作,每次Scan操作都是一个RPC操作,会返回n行,此处的n由HBase.client.scanner.caching设置,默认是1。这就意味着默认100万行的数据需要100万次RPC操作。随机读和顺序读基本上两难全。(参见Hive over HBase和Hive over HDFS性能比较分析 ,http://superlxw1234.iteye.com/blog/2008274同时,如果默认通过Region Server去进行HBase的批量读取,HBase会需要使用大量的资源,例如WAL、合并(Compaction)和刷出(Flush)队列、服务器内存,还会触发大量的JVM垃圾回收,此时性能影响的将不仅仅是HBase,还有其余Java程序。(详见Efficient bulk load of HBase using Spark ,http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/ 和How-to: Use HBase Bulk Loading, and Why ,http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/HBase支持批量读写,在指定了RowKey范围下的批量读性能也尚可,但最优的场景还是大量写少量读。

  • 多表关联与复杂统计:HBase并未内置关联查询,虽可以通过客户端实现多表关联,例如当年基于HBase的图数据库Titanhttp://titan.thinkaurelius.com/实现的复杂性和机理限制导致基于HBase的关联查询并不常见。

以上两个场景,其实正是Spark+HDFS+Hive的强项。Spark+HDFS+Hive应对离线批处理和复杂统计计算场景,HBase应对准实时的并发查询和简单计算场景,也是实践中比较好的方式。

2性能

网上已有许多不同的NoSQL数据库之间的性能比较,详见参考链接:

  • 2016-11:Performance Comparison between Five NoSQL Databaseshttp://ieeexplore.ieee.org/abstract/document/7979888/

  • 2014-09:性能测试:SequoiaDB vs. MongoDB vs. Cassandra vs. HBasehttp://www.csdn.net/article/2014-09-16/2821707-benchmark-test-of-MongoDB-SequoiaDB-HBase-Cassandra/2

  • 2014-08:datastax的评测https://www.datastax.com/wp-content/themes/datastax-2014-08/files/NoSQL_Benchmarks_EndPoint.pdf

  • 2014-02:Sergey Sverchkov的评测https://jaxenter.com/evaluating-nosql-performance-which-database-is-right-for-your-data-107481.html

评测的结果都差不多,HBase在NoSQL中属于较为中庸的,各种场景都适合,可以作为Spark+Hadoop生态系统的Key-Value数据库选择。当然,选择其他类型的KV数据库,可能可以获得某方面的极致性能,同时也要考虑学习成本和易用性。

3生态系统兼容

HBase基于Hadoop系统构建,安装极为简单。Spark有多种方式操作HBase,接下来会介绍Hortonworks开源出来的SPARK-ON-HBASE: DATAFRAME BASED HBASE CONNECTOR。(https://zh.hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/)

在Hive中,我们可以使用HQL语句在HBase表上进行查询、插入操作,同时可以将HBase中的数据导出至Hive进行离线批处理分析,详见HBaseIntegration  (https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration#HBaseIntegration-HiveHBaseIntegration)和LanguageManual ImportExport。(https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport)

图 HBase架构



三、 SHC:Spark on HBase简介

详细的介绍可以看SPARK-ON-HBASE: DATAFRAME BASED HBASE CONNECTOR,想要一目十行看中文的同学可以移步相对应的译文


四、SHC+DataFrame便捷应用


使用SHC+DataFrame非常简单,基本只需要关注HBaseTableCatalog,即定义出DataFrame的每一列应该输出到HBase中的表名、RowKey,列族、列名、类型以及命名空间与编码格式。该格式可以从JSON数据读入,具体格式如下,详细样例请移步官方基础样例:

1. def catalog = s"""{  

2.                 |"table":{"namespace":"default""name":"shcExampleTable""tableCoder":"PrimitiveType"}, 

3.                   |"rowkey":"key1:key2"

4.                   |"columns":{

5.                   |"col00":{"cf":"rowkey""col":"key1""type":"string""length":"6"}, 

6.                   |"col01":{"cf":"rowkey""col":"key2""type":"int"},  

7.                   |"col1":{"cf":"cf1""col":"col1""type":"boolean"}, 

8.                   |"col2":{"cf":"cf2""col":"col2""type":"double"}, 

9.                   |"col3":{"cf":"cf3""col":"col3""type":"float"},  

10.                   |"col4":{"cf":"cf4""col":"col4""type":"int"}, 

11.                   |"col5":{"cf":"cf5""col":"col5""type":"bigint"},  

12.                   |"col6":{"cf":"cf6""col":"col6""type":"smallint"}, 

13.                   |"col7":{"cf":"cf7""col":"col7""type":"string"}, 

14.                   |"col8":{"cf":"cf8""col":"col8""type":"tinyint"

15.                   |} 

16.                   |}""".stripMargin 

HBase中跨列族访问是非常低效的,所以在实际应用的过程中,往往需要进行存储的DataFrame中列都是一个列族的。同时有些时候DataFrame会产生不固定的列,频繁手动更改HBaseTableCatalog的Schema也是一个无趣的活。DataFrame的优势在这个时候就体现出来了,我们可以根据DataFrame的Schema自动化生成HBase的映射关系以及列值类型,其余的自己指定就好。代码如下:

1. import org.apache.spark.sql.execution.datasources.HBase._ 

2. import org.apache.spark.sql._ 

3. import org.apache.spark.sql.types._ 

4. import org.apache.spark.sql.functions._ 

5. import org.apache.spark.{SparkConf, SparkContext}  

6. import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}  

7.   

8. import scala.collection.JavaConversions._  

9. import scala.collection.JavaConverters._  

10.   

11.   /**    

12.      * 保存DataFrameHBase

13.     * @param df_2_be_save 需要保存的DataFrame 

14.     * @param tablename_hb 需要存入的表名,需要提前建好 

15.     * @param namespace_hb default,根据需要更改,一般默认就好 

16.     * @param tableCoder_hb PrimitiveType,根据需要更改,一般默认就好 

17.     * @param rowkey_hb 主键名字 

18.     * @param columFamily_hb 列族名,在该函数中,DataFrame的所有列都只能插入到同一个列族中,列族需要预先定义好 

19.     * @param newtablecount 

20.     */  

21.    def DataFrame_Save_To_HBase(df_2_be_save:DataFrame)(tablename_hb:String, namespace_hb:String = "default",tableCoder_hb:String = "PrimitiveType",rowkey_hb:String,columFamily_hb:String,newtablecount:Int = 5): Unit ={ 

22.       val column_json = df_2_be_save.schema.map{ 

23.           stf =>{ 

24.             if(stf.name==rowkey_hb) { 

25.               (stf.name -> JSON.toJSON(Map("cf" -> "rowkey",  

26.                 "col" -> rowkey_hb,  

27.                  "type" -> stf.dataType.simpleString).asJava))  

28.         }else{  

29.          (stf.name -> JSON.toJSON(Map("cf"-> columFamily_hb, 

30.              "col"-> stf.name,  

31.              "type"-> stf.dataType.simpleString).asJava))   

32.         }. 

33.       }.

34.     }.toMap.asJava   

35.     val df_HBase_schema = JSON.toJSON(Map("table"->JSON.toJSON(Map("name"->tablename_hb, 

36.      "namespace"->namespace_hb,"tableCoder"->tableCoder_hb).asJava), 

37.       "rowkey"->rowkey_hb, 

38.       "columns"->column_json).asJava)  

39. //    println(df_HBase_schema) 

40.    df_2_be_save. 

41.      write.options( 

42.       Map(HBaseTableCatalog.tableCatalog -> df_HBase_schema.toString, HBaseTableCatalog.newTable -> newtablecount.toString)). 

43.       format("org.apache.spark.sql.execution.datasources.HBase"). 

44.       save()  

45.   } 


至此,诸位读者应该可以拿着SHC和Spark愉快的玩耍了。


参考链接

1. CAP(http://s3.thinkaurelius.com/docs/titan/0.5.4/benefits.html)

2. HBase-数据写入流程解析 (http://hbasefly.com/2016/03/23/hbase_writer/)

3. Hive over HBase和Hive over HDFS性能比较分析( http://superlxw1234.iteye.com/blog/2008274)

4. Efficient bulk load of HBase using Spark  (http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/ )

5. How-to: Use HBase Bulk Loading, and Why (http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/)

6. Titan(http://titan.thinkaurelius.com/)

7. 2016-11:Performance Comparison between Five NoSQL Databases(http://ieeexplore.ieee.org/abstract/document/7979888/)

8. 2014-09:性能测试:SequoiaDB vs. MongoDB vs. Cassandra vs. HBase(http://www.csdn.net/article/2014-09-16/2821707-benchmark-test-of-MongoDB-SequoiaDB-HBase-Cassandra/2)

9. 2014-08:datastax的评测(https://www.datastax.com/wp-content/themes/datastax-2014-08/files/NoSQL_Benchmarks_EndPoint.pdf)

10. 2014-02:Sergey Sverchkov的评测 (https://jaxenter.com/evaluating-nosql-performance-which-database-is-right-for-your-data-107481.html)

11. SPARK-ON-HBASE: DATAFRAME BASED HBASE CONNECTOR (https://zh.hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/)

12. HBaseIntegration (https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration.#HBaseIntegration-HiveHBaseIntegration)

13. LanguageManual ImportExport (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport)


内容编辑:安全大数据分析实验室 刘威歆   责任编辑:肖晴

关于我们


绿盟科技创新中心是绿盟科技的前沿技术研究部门。包括云安全实验室、安全大数据分析实验室和物联网安全实验室。团队成员由来自清华、北大、哈工大、中科院、北邮等多所重点院校的博士和硕士组成。

绿盟科技创新中心作为“中关村科技园区海淀园博士后工作站分站”的重要培养单位之一,与清华大学进行博士后联合培养,科研成果已涵盖各类国家课题项目、国家专利、国家标准、高水平学术论文、出版专业书籍等。

我们持续探索信息安全领域的前沿学术方向,从实践出发,结合公司资源和先进技术,实现概念级的原型系统,进而交付产品线孵化产品并创造巨大的经济价值。

以上是关于Spark DataFrame便捷整合HBase的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记Spark—— Spark SQL应用—— Spark DataSet基础操作

Spark SQL 介绍

Spark学习 Spark SQL

大数据-spark理论sparkSql,sparkStreaming,spark调优

java的怎么操作spark的dataframe

[Spark][Python][DataFrame][SQL]Spark对DataFrame直接执行SQL处理的例子